codelipenghui commented on a change in pull request #10199:
URL: https://github.com/apache/pulsar/pull/10199#discussion_r613949002
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -308,10 +310,17 @@ public void close() throws PulsarServerException {
try {
closeAsync().get();
} catch (ExecutionException e) {
- if (e.getCause() instanceof PulsarServerException) {
- throw (PulsarServerException) e.getCause();
+ Throwable cause = e.getCause();
+ if (cause instanceof PulsarServerException) {
+ throw (PulsarServerException) cause;
+ } else if (cause instanceof TimeoutException) {
+ if (getConfiguration().getBrokerShutdownTimeoutMs() < 1000L) {
+ // ignore shutdown timeout when it's less than 1000ms (in
tests)
Review comment:
But even if we ignore the timeout under 1000ms, the test still can occur
the TimeoutException? Most tests are extends from the
`MockedPulsarServiceBaseTest`, can we deal with this in the
`MockedPulsarServiceBaseTest`?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -694,60 +705,192 @@ public void close() throws IOException {
}
});
- List<CompletableFuture<Void>> asyncCloseFutures = new
ArrayList<>();
+ CompletableFuture<CompletableFuture<Void>>
cancellableDownstreamFutureReference = new CompletableFuture<>();
+ CompletableFuture<Void> shutdownFuture =
+
CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
+ shutdownEventLoopGracefully(workerGroup))
+ .handle((v, t) -> {
+ if (t != null) {
+ log.warn("Error shutting down event loops
gracefully", t);
+ } else {
+ log.info("Event loops shutdown
completed.");
+ }
+ return null;
+ })
+ .thenCompose(__ -> {
+ log.info("Continuing to second phase in
shutdown.");
- if (listenChannel != null && listenChannel.isOpen()) {
- asyncCloseFutures.add(closeChannel(listenChannel));
- }
+ List<CompletableFuture<Void>>
asyncCloseFutures = new ArrayList<>();
- if (listenChannelTls != null && listenChannelTls.isOpen()) {
- asyncCloseFutures.add(closeChannel(listenChannelTls));
- }
+ if (listenChannel != null &&
listenChannel.isOpen()) {
+
asyncCloseFutures.add(closeChannel(listenChannel));
+ }
+
+ if (listenChannelTls != null &&
listenChannelTls.isOpen()) {
+
asyncCloseFutures.add(closeChannel(listenChannelTls));
+ }
+
+ if (interceptor != null) {
+ interceptor.close();
+ interceptor = null;
+ }
- acceptorGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
+ try {
+ authenticationService.close();
+ } catch (IOException e) {
+ log.warn("Error in closing
authenticationService", e);
+ }
+ pulsarStats.close();
+
ClientCnxnAspect.removeListener(zkStatsListener);
+ ClientCnxnAspect.registerExecutor(null);
+ try {
+ delayedDeliveryTrackerFactory.close();
+ } catch (IOException e) {
+ log.warn("Error in closing
delayedDeliveryTrackerFactory", e);
+ }
- if (interceptor != null) {
- interceptor.close();
- interceptor = null;
+
asyncCloseFutures.add(GracefulExecutorServiceShutdownHandler
+ .shutdownGracefully(
+ (long)
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+ *
pulsar.getConfiguration().getBrokerShutdownTimeoutMs()),
+ statsUpdater,
+ inactivityMonitor,
+ messageExpiryMonitor,
+ compactionMonitor,
+ messagePublishBufferMonitor,
+ consumedLedgersMonitor,
+ backlogQuotaChecker,
+ topicOrderedExecutor,
+ topicPublishRateLimiterMonitor,
+
brokerPublishRateLimiterMonitor,
+ deduplicationSnapshotMonitor));
+
+ CompletableFuture<Void> combined =
+
FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
+
cancellableDownstreamFutureReference.complete(combined);
+ combined.handle((v, t) -> {
+ if (t == null) {
+ log.info("Broker service completely
shut down");
+ } else {
+ if (t instanceof
CancellationException) {
+ log.warn("Broker service didn't
complete gracefully. "
+ + "Terminating Broker
service.");
+ } else {
+ log.warn("Broker service shut down
completed with exception", t);
+ }
+ }
+ return null;
+ });
+ return combined;
+ });
+ FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () ->
cancellableDownstreamFutureReference
+ .thenAccept(future -> future.cancel(false)));
+ return shutdownFuture;
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
+ CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup
eventLoopGroup) {
+ long brokerShutdownTimeoutMs =
pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
+ long quietPeriod = Math.min((long) (
+ GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT *
brokerShutdownTimeoutMs),
+ GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+ long timeout = (long)
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+ return NettyFutureUtil.toCompletableFutureVoid(
+ eventLoopGroup.shutdownGracefully(quietPeriod,
+ timeout, TimeUnit.MILLISECONDS));
+ }
+
+ @Slf4j
+ private static class GracefulExecutorServiceShutdownHandler {
+ private final ScheduledExecutorService shutdownScheduler =
Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory(getClass().getSimpleName()));
+ private final List<ExecutorService> executors;
+ private final CompletableFuture<Void> future;
+ private final long timeoutMs;
+
+ private GracefulExecutorServiceShutdownHandler(long timeoutMs,
ExecutorService... executorServices) {
+ this.timeoutMs = timeoutMs;
+ executors = Arrays.stream(executorServices)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ future = new CompletableFuture<>();
+ }
+
+ static CompletableFuture<Void> shutdownGracefully(long timeoutMs,
ExecutorService... executorServices) {
+ return new GracefulExecutorServiceShutdownHandler(timeoutMs,
executorServices).doShutdownGracefully();
+ }
+
+ private CompletableFuture<Void> doShutdownGracefully() {
+ log.info("Shutting down {} executors.", executors.size());
+ executors.forEach(ExecutorService::shutdown);
+ FutureUtil.whenCancelledOrTimedOut(future, () -> {
+ terminate();
+ });
+ checkCompletion();
+ if (!shutdownScheduler.isShutdown()) {
+ try {
+ shutdownScheduler.schedule(this::terminate, timeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException e) {
+ // ignore
+ }
}
+ return future;
+ }
- statsUpdater.shutdown();
- inactivityMonitor.shutdown();
- messageExpiryMonitor.shutdown();
- compactionMonitor.shutdown();
- messagePublishBufferMonitor.shutdown();
- consumedLedgersMonitor.shutdown();
- backlogQuotaChecker.shutdown();
- authenticationService.close();
- pulsarStats.close();
- ClientCnxnAspect.removeListener(zkStatsListener);
- ClientCnxnAspect.registerExecutor(null);
- topicOrderedExecutor.shutdown();
- delayedDeliveryTrackerFactory.close();
- if (topicPublishRateLimiterMonitor != null) {
- topicPublishRateLimiterMonitor.shutdown();
+ private void terminate() {
+ for (ExecutorService executor : executors) {
+ if (!executor.isTerminated()) {
+ log.info("Shutting down forcefully executor {}", executor);
+ for (Runnable runnable : executor.shutdownNow()) {
+ log.info("Execution in progress for runnable instance
of {}: {}", runnable.getClass(),
+ runnable);
+ }
+ }
}
- if (brokerPublishRateLimiterMonitor != null) {
- brokerPublishRateLimiterMonitor.shutdown();
+ shutdown();
+ }
+
+ private void shutdown() {
+ if (!shutdownScheduler.isShutdown()) {
+ log.info("Shutting down scheduler.");
+ shutdownScheduler.shutdown();
}
- if (deduplicationSnapshotMonitor != null) {
- deduplicationSnapshotMonitor.shutdown();
+ }
+
+ private void scheduleCheck() {
+ if (!shutdownScheduler.isShutdown()) {
+ try {
+ shutdownScheduler
+ .schedule(this::checkCompletion,
Math.max(timeoutMs / 100, 10), TimeUnit.MILLISECONDS);
Review comment:
I see.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]