This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab322ed399 [fix][broker] Fix PulsarService/BrokerService shutdown
when brokerShutdownTimeoutMs=0 (#21496)
6ab322ed399 is described below
commit 6ab322ed39901ca1b3e375ccf66d26a24da5874f
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 1 20:38:48 2023 +0200
[fix][broker] Fix PulsarService/BrokerService shutdown when
brokerShutdownTimeoutMs=0 (#21496)
---
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 10 +++++++---
.../java/org/apache/pulsar/broker/service/BrokerService.java | 9 ++++++++-
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 84e044df0bf..18e7f554c99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -639,14 +639,18 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void>
future) {
+ long brokerShutdownTimeoutMs =
getConfiguration().getBrokerShutdownTimeoutMs();
+ if (brokerShutdownTimeoutMs <= 0) {
+ return future;
+ }
ScheduledExecutorService shutdownExecutor =
Executors.newSingleThreadScheduledExecutor(
new
ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() +
"-shutdown"));
FutureUtil.addTimeoutHandling(future,
- Duration.ofMillis(Math.max(1L,
getConfiguration().getBrokerShutdownTimeoutMs())),
+ Duration.ofMillis(brokerShutdownTimeoutMs),
shutdownExecutor, () ->
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
- if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() >
0) {
- LOG.info("Shutdown timed out after {} ms",
getConfiguration().getBrokerShutdownTimeoutMs());
+ if (t instanceof TimeoutException) {
+ LOG.info("Shutdown timed out after {} ms",
brokerShutdownTimeoutMs);
LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
}
// shutdown the shutdown executor
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1dccec6f305..93337bafd90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -826,6 +826,7 @@ public class BrokerService implements Closeable {
for (EventLoopGroup group : protocolHandlersWorkerGroups) {
shutdownEventLoops.add(shutdownEventLoopGracefully(group));
}
+
CompletableFuture<Void> shutdownFuture =
CompletableFuture.allOf(shutdownEventLoops.toArray(new
CompletableFuture[0]))
.handle((v, t) -> {
@@ -836,7 +837,7 @@ public class BrokerService implements Closeable {
}
return null;
})
- .thenCompose(__ -> {
+ .thenComposeAsync(__ -> {
log.info("Continuing to second phase in
shutdown.");
List<CompletableFuture<Void>>
asyncCloseFutures = new ArrayList<>();
@@ -900,6 +901,12 @@ public class BrokerService implements Closeable {
return null;
});
return combined;
+ }, runnable -> {
+ // run the 2nd phase of the shutdown in a
separate thread
+ Thread thread = new Thread(runnable);
+
thread.setName("BrokerService-shutdown-phase2");
+ thread.setDaemon(false);
+ thread.start();
});
FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () ->
cancellableDownstreamFutureReference
.thenAccept(future -> future.cancel(false)));