This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 61f6fcd9e2daf1c449cade842fecd80effc6d6cf Author: Yunze Xu <[email protected]> AuthorDate: Tue Oct 28 22:53:04 2025 +0800 [improve][broker] Reduce the broker close time to avoid useless wait for event loop shutdown (#24895) (cherry picked from commit 411aea9f193553d13ef9daa760c81c21567f652b) --- .../pulsar/broker/service/BrokerService.java | 34 ++++++----- .../service/BrokerEventLoopShutdownTest.java | 71 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8c4d1712b73..6369a5499d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -97,6 +97,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -206,7 +207,6 @@ public class BrokerService implements Closeable { private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class, "futureWithDeadline(...)"); - private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L; private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d; private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d; @@ -308,7 +308,7 @@ public class BrokerService implements Closeable { // fallback if recover BucketDelayedDeliveryTracker failed. private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory; private final ServerBootstrap defaultServerBootstrap; - private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>(); + private final List<Pair<String, EventLoopGroup>> protocolHandlersWorkerGroups = new ArrayList<>(); @Getter private final BundlesQuotas bundlesQuotas; @@ -530,7 +530,7 @@ public class BrokerService implements Closeable { EventLoopGroup dedicatedWorkerGroup = EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup)); - protocolHandlersWorkerGroups.add(dedicatedWorkerGroup); + protocolHandlersWorkerGroups.add(Pair.of(protocol, dedicatedWorkerGroup)); bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup); } else { bootstrap = defaultServerBootstrap.clone(); @@ -841,10 +841,10 @@ public class BrokerService implements Closeable { CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>(); log.info("Event loops shutting down gracefully..."); List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>(); - shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup)); - shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup)); - for (EventLoopGroup group : protocolHandlersWorkerGroups) { - shutdownEventLoops.add(shutdownEventLoopGracefully(group)); + shutdownEventLoops.add(shutdownEventLoopGracefully("acceptor", acceptorGroup)); + shutdownEventLoops.add(shutdownEventLoopGracefully("worker", workerGroup)); + for (final var pair : protocolHandlersWorkerGroups) { + shutdownEventLoops.add(shutdownEventLoopGracefully(pair.getLeft(), pair.getRight())); } CompletableFuture<Void> shutdownFuture = @@ -939,15 +939,21 @@ public class BrokerService implements Closeable { } } - CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) { + CompletableFuture<Void> shutdownEventLoopGracefully(String name, EventLoopGroup eventLoopGroup) { long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs(); - long quietPeriod = Math.min((long) ( - GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs), - GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS); long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs); - return NettyFutureUtil.toCompletableFutureVoid( - eventLoopGroup.shutdownGracefully(quietPeriod, - timeout, MILLISECONDS)); + long periodMs = (timeout > 0) ? 1 : 0; + long startNs = System.nanoTime(); + return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully( + periodMs, timeout, MILLISECONDS) + ).whenComplete((__, e) -> { + final var elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + if (e == null) { + log.info("Event loop {} shut down after {} ms", name, elapsedMs); + } else { + log.warn("Failed to shut down event loop {} after {} ms: {}", name, elapsedMs, e.getMessage()); + } + }); } private CompletableFuture<Void> closeChannel(Channel channel) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java new file mode 100644 index 00000000000..29421f155b6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test +public class BrokerEventLoopShutdownTest { + + private LocalBookkeeperEnsemble bk; + + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { + bk = new LocalBookkeeperEnsemble(2, 0, () -> 0); + bk.start(); + } + + @AfterClass(alwaysRun = true, timeOut = 30000) + public void cleanup() throws Exception { + bk.stop(); + } + + @Test(timeOut = 60000) + public void testCloseOneBroker() throws Exception { + final var clusterName = "test"; + final Supplier<ServiceConfiguration> configSupplier = () -> { + final var config = new ServiceConfiguration(); + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort()); + return config; + }; + @Cleanup final var broker0 = new PulsarService(configSupplier.get()); + @Cleanup final var broker1 = new PulsarService(configSupplier.get()); + broker0.start(); + broker1.start(); + + final var startNs = System.nanoTime(); + broker0.close(); + final var closeTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + Assert.assertTrue(closeTimeMs < 1000, "close time: " + closeTimeMs + " ms"); + } +}
