This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c8db7c8442c [fix][broker] call ServerCnx#closeProducer from correct 
thread (#20747)
c8db7c8442c is described below

commit c8db7c8442c355d2d95941a65140841af558612b
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Jul 7 14:33:08 2023 -0500

    [fix][broker] call ServerCnx#closeProducer from correct thread (#20747)
    
    (cherry picked from commit 0bac873cba1b39f888fe359368217e8ac030f29a)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 888668e15b1..130d312c2a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1611,7 +1611,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
 
             producers.remove(producerId, producerFuture);
-        }).exceptionally(ex -> {
+        }).exceptionallyAsync(ex -> {
             if (ex.getCause() instanceof 
BrokerServiceException.TopicMigratedException) {
                 Optional<ClusterUrl> clusterURL = 
getMigratedClusterUrl(service.getPulsar());
                 if (clusterURL.isPresent()) {
@@ -1652,7 +1652,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage());
             }
             return null;
-        });
+        }, ctx.executor());
 
         producerQueuedFuture.thenRun(() -> {
             // If the producer is queued waiting, we will get an immediate 
notification
@@ -2931,6 +2931,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     public void closeProducer(Producer producer) {
+        assert ctx.executor().inEventLoop();
         // removes producer-connection from map and send close command to 
producer
         safelyRemoveProducer(producer);
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {

Reply via email to