This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 411aea9f193 [improve][broker] Reduce the broker close time to avoid
useless wait for event loop shutdown (#24895)
411aea9f193 is described below
commit 411aea9f193553d13ef9daa760c81c21567f652b
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Oct 28 22:53:04 2025 +0800
[improve][broker] Reduce the broker close time to avoid useless wait for
event loop shutdown (#24895)
---
.../pulsar/broker/service/BrokerService.java | 34 ++++++-----
.../service/BrokerEventLoopShutdownTest.java | 71 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 378fda44c2e..0a27a089f63 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -97,6 +97,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -206,7 +207,6 @@ public class BrokerService implements Closeable {
private static final TimeoutException
FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Failed to load topic within
timeout", BrokerService.class,
"futureWithDeadline(...)");
- private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
private static final double
GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
private static final double
GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
@@ -308,7 +308,7 @@ public class BrokerService implements Closeable {
// fallback if recover BucketDelayedDeliveryTracker failed.
private volatile DelayedDeliveryTrackerFactory
fallbackDelayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
- private final List<EventLoopGroup> protocolHandlersWorkerGroups = new
ArrayList<>();
+ private final List<Pair<String, EventLoopGroup>>
protocolHandlersWorkerGroups = new ArrayList<>();
@Getter
private final BundlesQuotas bundlesQuotas;
@@ -546,7 +546,7 @@ public class BrokerService implements Closeable {
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false,
defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
- protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
+ protocolHandlersWorkerGroups.add(Pair.of(protocol,
dedicatedWorkerGroup));
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
} else {
bootstrap = defaultServerBootstrap.clone();
@@ -863,10 +863,10 @@ public class BrokerService implements Closeable {
CompletableFuture<CompletableFuture<Void>>
cancellableDownstreamFutureReference = new CompletableFuture<>();
log.info("Event loops shutting down gracefully...");
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
- shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup));
- shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup));
- for (EventLoopGroup group : protocolHandlersWorkerGroups) {
- shutdownEventLoops.add(shutdownEventLoopGracefully(group));
+ shutdownEventLoops.add(shutdownEventLoopGracefully("acceptor",
acceptorGroup));
+ shutdownEventLoops.add(shutdownEventLoopGracefully("worker",
workerGroup));
+ for (final var pair : protocolHandlersWorkerGroups) {
+
shutdownEventLoops.add(shutdownEventLoopGracefully(pair.getLeft(),
pair.getRight()));
}
CompletableFuture<Void> shutdownFuture =
@@ -961,15 +961,21 @@ public class BrokerService implements Closeable {
}
}
- CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup
eventLoopGroup) {
+ CompletableFuture<Void> shutdownEventLoopGracefully(String name,
EventLoopGroup eventLoopGroup) {
long brokerShutdownTimeoutMs =
pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
- long quietPeriod = Math.min((long) (
- GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT *
brokerShutdownTimeoutMs),
- GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
long timeout = (long)
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
- return NettyFutureUtil.toCompletableFutureVoid(
- eventLoopGroup.shutdownGracefully(quietPeriod,
- timeout, MILLISECONDS));
+ long periodMs = (timeout > 0) ? 1 : 0;
+ long startNs = System.nanoTime();
+ return
NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully(
+ periodMs, timeout, MILLISECONDS)
+ ).whenComplete((__, e) -> {
+ final var elapsedMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+ if (e == null) {
+ log.info("Event loop {} shut down after {} ms", name,
elapsedMs);
+ } else {
+ log.warn("Failed to shut down event loop {} after {} ms: {}",
name, elapsedMs, e.getMessage());
+ }
+ });
}
private CompletableFuture<Void> closeChannel(Channel channel) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
new file mode 100644
index 00000000000..29421f155b6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test
+public class BrokerEventLoopShutdownTest {
+
+ private LocalBookkeeperEnsemble bk;
+
+ @BeforeClass(alwaysRun = true)
+ public void setup() throws Exception {
+ bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+ bk.start();
+ }
+
+ @AfterClass(alwaysRun = true, timeOut = 30000)
+ public void cleanup() throws Exception {
+ bk.stop();
+ }
+
+ @Test(timeOut = 60000)
+ public void testCloseOneBroker() throws Exception {
+ final var clusterName = "test";
+ final Supplier<ServiceConfiguration> configSupplier = () -> {
+ final var config = new ServiceConfiguration();
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" +
bk.getZookeeperPort());
+ return config;
+ };
+ @Cleanup final var broker0 = new PulsarService(configSupplier.get());
+ @Cleanup final var broker1 = new PulsarService(configSupplier.get());
+ broker0.start();
+ broker1.start();
+
+ final var startNs = System.nanoTime();
+ broker0.close();
+ final var closeTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+ Assert.assertTrue(closeTimeMs < 1000, "close time: " + closeTimeMs + "
ms");
+ }
+}