lhotari commented on a change in pull request #10199:
URL: https://github.com/apache/pulsar/pull/10199#discussion_r612550100



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -694,60 +705,192 @@ public void close() throws IOException {
                 }
             });
 
-            List<CompletableFuture<Void>> asyncCloseFutures = new 
ArrayList<>();
+            CompletableFuture<CompletableFuture<Void>> 
cancellableDownstreamFutureReference = new CompletableFuture<>();
+            CompletableFuture<Void> shutdownFuture =
+                    
CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
+                            shutdownEventLoopGracefully(workerGroup))
+                            .handle((v, t) -> {
+                                if (t != null) {
+                                    log.warn("Error shutting down event loops 
gracefully", t);
+                                } else {
+                                    log.info("Event loops shutdown 
completed.");
+                                }
+                                return null;
+                            })
+                            .thenCompose(__ -> {
+                                log.info("Continuing to second phase in 
shutdown.");
 
-            if (listenChannel != null && listenChannel.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannel));
-            }
+                                List<CompletableFuture<Void>> 
asyncCloseFutures = new ArrayList<>();
 
-            if (listenChannelTls != null && listenChannelTls.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannelTls));
-            }
+                                if (listenChannel != null && 
listenChannel.isOpen()) {
+                                    
asyncCloseFutures.add(closeChannel(listenChannel));
+                                }
+
+                                if (listenChannelTls != null && 
listenChannelTls.isOpen()) {
+                                    
asyncCloseFutures.add(closeChannel(listenChannelTls));
+                                }
+
+                                if (interceptor != null) {
+                                    interceptor.close();
+                                    interceptor = null;
+                                }
 
-            acceptorGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
+                                try {
+                                    authenticationService.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing 
authenticationService", e);
+                                }
+                                pulsarStats.close();
+                                
ClientCnxnAspect.removeListener(zkStatsListener);
+                                ClientCnxnAspect.registerExecutor(null);
+                                try {
+                                    delayedDeliveryTrackerFactory.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing 
delayedDeliveryTrackerFactory", e);
+                                }
 
-            if (interceptor != null) {
-                interceptor.close();
-                interceptor = null;
+                                
asyncCloseFutures.add(GracefulExecutorServiceShutdownHandler
+                                        .shutdownGracefully(
+                                                (long) 
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+                                                        * 
pulsar.getConfiguration().getBrokerShutdownTimeoutMs()),
+                                                statsUpdater,
+                                                inactivityMonitor,
+                                                messageExpiryMonitor,
+                                                compactionMonitor,
+                                                messagePublishBufferMonitor,
+                                                consumedLedgersMonitor,
+                                                backlogQuotaChecker,
+                                                topicOrderedExecutor,
+                                                topicPublishRateLimiterMonitor,
+                                                
brokerPublishRateLimiterMonitor,
+                                                deduplicationSnapshotMonitor));
+
+                                CompletableFuture<Void> combined =
+                                        
FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
+                                
cancellableDownstreamFutureReference.complete(combined);
+                                combined.handle((v, t) -> {
+                                    if (t == null) {
+                                        log.info("Broker service completely 
shut down");
+                                    } else {
+                                        if (t instanceof 
CancellationException) {
+                                            log.warn("Broker service didn't 
complete gracefully. "
+                                                    + "Terminating Broker 
service.");
+                                        } else {
+                                            log.warn("Broker service shut down 
completed with exception", t);
+                                        }
+                                    }
+                                    return null;
+                                });
+                                return combined;
+                            });
+            FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> 
cancellableDownstreamFutureReference
+                    .thenAccept(future -> future.cancel(false)));
+            return shutdownFuture;
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
+    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup 
eventLoopGroup) {
+        long brokerShutdownTimeoutMs = 
pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
+        long quietPeriod = Math.min((long) (
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * 
brokerShutdownTimeoutMs),
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+        long timeout = (long) 
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+        return NettyFutureUtil.toCompletableFutureVoid(
+                eventLoopGroup.shutdownGracefully(quietPeriod,
+                        timeout, TimeUnit.MILLISECONDS));
+    }
+
+    @Slf4j
+    private static class GracefulExecutorServiceShutdownHandler {
+        private final ScheduledExecutorService shutdownScheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory(getClass().getSimpleName()));
+        private final List<ExecutorService> executors;
+        private final CompletableFuture<Void> future;
+        private final long timeoutMs;
+
+        private GracefulExecutorServiceShutdownHandler(long timeoutMs, 
ExecutorService... executorServices) {
+            this.timeoutMs = timeoutMs;
+            executors = Arrays.stream(executorServices)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+            future = new CompletableFuture<>();
+        }
+
+        static CompletableFuture<Void> shutdownGracefully(long timeoutMs, 
ExecutorService... executorServices) {
+            return new GracefulExecutorServiceShutdownHandler(timeoutMs, 
executorServices).doShutdownGracefully();
+        }
+
+        private CompletableFuture<Void> doShutdownGracefully() {
+            log.info("Shutting down {} executors.", executors.size());
+            executors.forEach(ExecutorService::shutdown);
+            FutureUtil.whenCancelledOrTimedOut(future, () -> {
+                terminate();
+            });
+            checkCompletion();
+            if (!shutdownScheduler.isShutdown()) {
+                try {
+                    shutdownScheduler.schedule(this::terminate, timeoutMs, 
TimeUnit.MILLISECONDS);
+                } catch (RejectedExecutionException e) {
+                    // ignore
+                }
             }
+            return future;
+        }
 
-            statsUpdater.shutdown();
-            inactivityMonitor.shutdown();
-            messageExpiryMonitor.shutdown();
-            compactionMonitor.shutdown();
-            messagePublishBufferMonitor.shutdown();
-            consumedLedgersMonitor.shutdown();
-            backlogQuotaChecker.shutdown();
-            authenticationService.close();
-            pulsarStats.close();
-            ClientCnxnAspect.removeListener(zkStatsListener);
-            ClientCnxnAspect.registerExecutor(null);
-            topicOrderedExecutor.shutdown();
-            delayedDeliveryTrackerFactory.close();
-            if (topicPublishRateLimiterMonitor != null) {
-                topicPublishRateLimiterMonitor.shutdown();
+        private void terminate() {
+            for (ExecutorService executor : executors) {
+                if (!executor.isTerminated()) {
+                    log.info("Shutting down forcefully executor {}", executor);
+                    for (Runnable runnable : executor.shutdownNow()) {
+                        log.info("Execution in progress for runnable instance 
of {}: {}", runnable.getClass(),
+                                runnable);
+                    }
+                }
             }
-            if (brokerPublishRateLimiterMonitor != null) {
-                brokerPublishRateLimiterMonitor.shutdown();
+            shutdown();
+        }
+
+        private void shutdown() {
+            if (!shutdownScheduler.isShutdown()) {
+                log.info("Shutting down scheduler.");
+                shutdownScheduler.shutdown();
             }
-            if (deduplicationSnapshotMonitor != null) {
-                deduplicationSnapshotMonitor.shutdown();
+        }
+
+        private void scheduleCheck() {
+            if (!shutdownScheduler.isShutdown()) {
+                try {
+                    shutdownScheduler
+                            .schedule(this::checkCompletion, 
Math.max(timeoutMs / 100, 10), TimeUnit.MILLISECONDS);

Review comment:
       It's intentional to use `Math.max((timeoutMs / 100, 10)` as the interval 
for checking if all executors have been terminated. If we'd use `timeoutMs`, 
we'd have to wait for the complete timeout duration until the check is made. 
   The check is fairly cheap, so it's fine to do the check fairly often. When 
using a timeout of 30000 ms, the check would be made every 300 milliseconds. 
Currently the minimum checking interval is 10 milliseconds. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to