This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 895e05d088f [improve][broker] Close protocol handlers before unloading
namespace bundles (#22728)
895e05d088f is described below
commit 895e05d088f4db23f1ca4d30e9d2ecaf7d3a6761
Author: Yunze Xu <[email protected]>
AuthorDate: Tue May 21 16:26:36 2024 +0800
[improve][broker] Close protocol handlers before unloading namespace
bundles (#22728)
---
.../org/apache/pulsar/broker/PulsarService.java | 12 +-
.../channel/ServiceUnitStateChannelImpl.java | 2 +-
.../broker/protocol/PulsarClientBasedHandler.java | 152 +++++++++++++++++++++
.../protocol/PulsarClientBasedHandlerTest.java | 87 ++++++++++++
.../protocol/SimpleProtocolHandlerTestsBase.java | 16 ++-
5 files changed, 258 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index bf266d44d83..1a45bedfce4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -427,6 +427,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
public CompletableFuture<Void> closeAsync() {
mutex.lock();
try {
+ // Close protocol handler before unloading namespace bundles
because protocol handlers might maintain
+ // Pulsar clients that could send lookup requests that affect
unloading.
+ if (protocolHandlers != null) {
+ protocolHandlers.close();
+ protocolHandlers = null;
+ }
if (closeFuture != null) {
return closeFuture;
}
@@ -434,6 +440,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
+ // It only tells the Pulsar clients that this service is not ready
to serve for the lookup requests
state = State.Closing;
// close the service in reverse order v.s. in which they are
started
@@ -492,11 +499,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
(long)
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
- // close protocol handler before closing broker service
- if (protocolHandlers != null) {
- protocolHandlers.close();
- protocolHandlers = null;
- }
// cancel loadShedding task and shutdown the loadManager executor
before shutting down the broker
if (this.loadSheddingTask != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index c7702a40d0b..477a9239538 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE =
CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS =
100;
- private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS =
3000;
+ public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS =
3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3
mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs
to clean immediately
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
new file mode 100644
index 00000000000..ed9881a8cad
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+
+public class PulsarClientBasedHandler implements ProtocolHandler {
+
+ static final String PROTOCOL = "test";
+
+ private String topic;
+ private int partitions;
+ private String cluster;
+ private PulsarClient client;
+ private List<Reader<byte[]>> readers;
+ private ExecutorService executor;
+ private volatile boolean running = false;
+ volatile long closeTimeMs;
+
+ @Override
+ public String protocolName() {
+ return PROTOCOL;
+ }
+
+ @Override
+ public boolean accept(String protocol) {
+ return protocol.equals(PROTOCOL);
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf) throws Exception {
+ final var properties = conf.getProperties();
+ topic = (String) properties.getOrDefault("metadata.topic",
"metadata-topic");
+ partitions = (Integer) properties.getOrDefault("metadata.partitions",
1);
+ cluster = conf.getClusterName();
+ }
+
+ @Override
+ public String getProtocolDataToAdvertise() {
+ return "";
+ }
+
+ @Override
+ public void start(BrokerService service) {
+ try {
+ final var port =
service.getPulsar().getListenPortHTTP().orElseThrow();
+ @Cleanup final var admin =
PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build();
+ try {
+ admin.clusters().createCluster(cluster, ClusterData.builder()
+ .serviceUrl(service.getPulsar().getWebServiceAddress())
+
.serviceUrlTls(service.getPulsar().getWebServiceAddressTls())
+
.brokerServiceUrl(service.getPulsar().getBrokerServiceUrl())
+
.brokerServiceUrlTls(service.getPulsar().getBrokerServiceUrlTls())
+ .build());
+ } catch (PulsarAdminException ignored) {
+ }
+ try {
+ admin.tenants().createTenant("public",
TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
+ } catch (PulsarAdminException ignored) {
+ }
+ try {
+ admin.namespaces().createNamespace("public/default");
+ } catch (PulsarAdminException ignored) {
+ }
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ final var port = service.getListenPort().orElseThrow();
+ client = PulsarClient.builder().serviceUrl("pulsar://localhost:" +
port).build();
+ readers = new ArrayList<>();
+ for (int i = 0; i < partitions; i++) {
+ readers.add(client.newReader().topic(topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + i)
+ .startMessageId(MessageId.earliest).create());
+ }
+ running = true;
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(() -> {
+ while (running) {
+ readers.forEach(reader -> {
+ try {
+ reader.readNext(1, TimeUnit.MILLISECONDS);
+ } catch (PulsarClientException ignored) {
+ }
+ });
+ }
+ });
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>>
newChannelInitializers() {
+ return Map.of();
+ }
+
+ @Override
+ public void close() {
+ final var start = System.currentTimeMillis();
+ running = false;
+ if (client != null) {
+ try {
+ client.close();
+ } catch (PulsarClientException ignored) {
+ }
+ client = null;
+ }
+ if (executor != null) {
+ executor.shutdown();
+ executor = null;
+ }
+ closeTimeMs = System.currentTimeMillis() - start;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
new file mode 100644
index 00000000000..9cc20cf7b9d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import java.io.File;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarClientBasedHandlerTest {
+
+ private final static String clusterName = "cluster";
+ private final static int shutdownTimeoutMs = 100;
+ private final int zkPort = PortManager.nextFreePort();
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextFreePort);
+ private File tempDirectory;
+ private PulsarService pulsar;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ bk.start();
+ final var config = new ServiceConfiguration();
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+
+ tempDirectory =
SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
+ PulsarClientBasedHandler.class.getName(), true);
+
+
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ config.setLoadBalancerDebugModeEnabled(true);
+ config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs);
+
+ pulsar = new PulsarService(config);
+ pulsar.start();
+ }
+
+ @Test(timeOut = 30000)
+ public void testStopBroker() throws PulsarServerException {
+ final var beforeStop = System.currentTimeMillis();
+ final var handler = (PulsarClientBasedHandler)
pulsar.getProtocolHandlers()
+ .protocol(PulsarClientBasedHandler.PROTOCOL);
+ pulsar.close();
+ final var elapsedMs = System.currentTimeMillis() - beforeStop;
+ log.info("It spends {} ms to stop the broker ({} for protocol
handler)", elapsedMs, handler.closeTimeMs);
+ Assert.assertTrue(elapsedMs <
ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
+ + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate
1 more second for other processes
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ bk.stop();
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectory(tempDirectory);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index c894b7d77c4..6c80f220c3d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -127,12 +127,18 @@ public abstract class SimpleProtocolHandlerTestsBase
extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
- tempDirectory =
Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
+ tempDirectory = configureProtocolHandler(conf,
MyProtocolHandler.class.getName(), useSeparateThreadPool);
+ super.baseSetup();
+ }
+
+ static File configureProtocolHandler(ServiceConfiguration conf, String
className, boolean useSeparateThreadPool)
+ throws Exception {
+ final var tempDirectory =
Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
conf.setMessagingProtocols(Collections.singleton("test"));
- buildMockNarFile(tempDirectory);
- super.baseSetup();
+ buildMockNarFile(tempDirectory, className);
+ return tempDirectory;
}
@Test
@@ -163,7 +169,7 @@ public abstract class SimpleProtocolHandlerTestsBase
extends BrokerTestBase {
}
}
- private static void buildMockNarFile(File tempDirectory) throws Exception {
+ private static void buildMockNarFile(File tempDirectory, String className)
throws Exception {
File file = new File(tempDirectory, "temp.nar");
try (ZipOutputStream zipfile = new ZipOutputStream(new
FileOutputStream(file))) {
@@ -176,7 +182,7 @@ public abstract class SimpleProtocolHandlerTestsBase
extends BrokerTestBase {
zipfile.putNextEntry(manifest);
String yaml = "name: test\n" +
"description: this is a test\n" +
- "handlerClass: " + MyProtocolHandler.class.getName() +
"\n";
+ "handlerClass: " + className + "\n";
zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
zipfile.closeEntry();
}