This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f8f256cfbdc [fix][broker] Continue closing even when executor is shut
down (#22599)
f8f256cfbdc is described below
commit f8f256cfbdcd780c81442dc5566b6ed071141645
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 26 23:16:54 2024 +0300
[fix][broker] Continue closing even when executor is shut down (#22599)
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c7d762d595c..155b6777882 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1429,7 +1430,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
closeClientFuture.complete(null);
- }, getOrderedExecutor()).exceptionally(ex -> {
+ }, command -> {
+ try {
+ getOrderedExecutor().execute(command);
+ } catch (RejectedExecutionException e) {
+ // executor has been shut down, execute in current
thread
+ command.run();
+ }
+ }).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);