This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c8db7c8442c [fix][broker] call ServerCnx#closeProducer from correct
thread (#20747)
c8db7c8442c is described below
commit c8db7c8442c355d2d95941a65140841af558612b
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Jul 7 14:33:08 2023 -0500
[fix][broker] call ServerCnx#closeProducer from correct thread (#20747)
(cherry picked from commit 0bac873cba1b39f888fe359368217e8ac030f29a)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 888668e15b1..130d312c2a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1611,7 +1611,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
producers.remove(producerId, producerFuture);
- }).exceptionally(ex -> {
+ }).exceptionallyAsync(ex -> {
if (ex.getCause() instanceof
BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL =
getMigratedClusterUrl(service.getPulsar());
if (clusterURL.isPresent()) {
@@ -1652,7 +1652,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage());
}
return null;
- });
+ }, ctx.executor());
producerQueuedFuture.thenRun(() -> {
// If the producer is queued waiting, we will get an immediate
notification
@@ -2931,6 +2931,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public void closeProducer(Producer producer) {
+ assert ctx.executor().inEventLoop();
// removes producer-connection from map and send close command to
producer
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {