This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f8f256cfbdc [fix][broker] Continue closing even when executor is shut 
down (#22599)
f8f256cfbdc is described below

commit f8f256cfbdcd780c81442dc5566b6ed071141645
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 26 23:16:54 2024 +0300

    [fix][broker] Continue closing even when executor is shut down (#22599)
---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c7d762d595c..155b6777882 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1429,7 +1430,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }
                 FutureUtil.waitForAll(futures).thenRunAsync(() -> {
                     closeClientFuture.complete(null);
-                }, getOrderedExecutor()).exceptionally(ex -> {
+                }, command -> {
+                    try {
+                        getOrderedExecutor().execute(command);
+                    } catch (RejectedExecutionException e) {
+                        // executor has been shut down, execute in current 
thread
+                        command.run();
+                    }
+                }).exceptionally(ex -> {
                     log.error("[{}] Error closing clients", topic, ex);
                     unfenceTopicToResume();
                     closeClientFuture.completeExceptionally(ex);

Reply via email to