This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit db43414efb934481b795f9b2a45311758a1fc384
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Apr 26 21:30:15 2024 +0800

    [fix][broker] Avoid being stuck when closing the broker with extensible 
load manager (#22573)
    
    (cherry picked from commit f411e3c0f26eef98382c7d06ea1676781247149b)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +
 .../store/TableViewLoadDataStoreImpl.java          |   6 +-
 .../pulsar/broker/service/BrokerService.java       |  11 +++
 .../extensions/ExtensibleLoadManagerCloseTest.java | 107 +++++++++++++++++++++
 4 files changed, 122 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a17607376e..47509e9bc49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -424,6 +424,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 return closeFuture;
             }
             LOG.info("Closing PulsarService");
+            if (brokerService != null) {
+                brokerService.unloadNamespaceBundlesGracefully();
+            }
             state = State.Closing;
 
             // close the service in reverse order v.s. in which they are 
started
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index d916e917162..81cf33b4a55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -161,12 +161,8 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
     }
 
     private void validateProducer() {
-        if (producer == null || !producer.isConnected()) {
+        if (producer == null) {
             try {
-                if (producer != null) {
-                    producer.close();
-                }
-                producer = null;
                 startProducer();
                 log.info("Restarted producer on {}", topic);
             } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2f3f2560e1d..8a12178cef5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -288,6 +288,7 @@ public class BrokerService implements Closeable {
     private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
 
     private final TopicEventsDispatcher topicEventsDispatcher = new 
TopicEventsDispatcher();
+    private volatile boolean unloaded = false;
 
     public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
         this.pulsar = pulsar;
@@ -956,9 +957,13 @@ public class BrokerService implements Closeable {
     }
 
     public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, 
boolean closeWithoutWaitingClientDisconnect) {
+        if (unloaded) {
+            return;
+        }
         try {
             log.info("Unloading namespace-bundles...");
             // make broker-node unavailable from the cluster
+            long disableBrokerStartTime = System.nanoTime();
             if (pulsar.getLoadManager() != null && 
pulsar.getLoadManager().get() != null) {
                 try {
                     pulsar.getLoadManager().get().disableBroker();
@@ -967,6 +972,10 @@ public class BrokerService implements Closeable {
                     // still continue and release bundle ownership as broker's 
registration node doesn't exist.
                 }
             }
+            double disableBrokerTimeSeconds =
+                    TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - 
disableBrokerStartTime))
+                            / 1000.0;
+            log.info("Disable broker in load manager completed in {} seconds", 
disableBrokerTimeSeconds);
 
             // unload all namespace-bundles gracefully
             long closeTopicsStartTime = System.nanoTime();
@@ -1001,6 +1010,8 @@ public class BrokerService implements Closeable {
             }
         } catch (Exception e) {
             log.error("Failed to disable broker from loadbalancer list {}", 
e.getMessage(), e);
+        } finally {
+            unloaded = true;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
new file mode 100644
index 00000000000..41413f3e3a9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ExtensibleLoadManagerCloseTest {
+
+    private static final String clusterName = "test";
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 
0, () -> 0);
+    private final List<PulsarService> brokers = new ArrayList<>();
+    private PulsarAdmin admin;
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() throws Exception {
+        bk.start();
+        for (int i = 0; i < 3; i++) {
+            final var broker = new PulsarService(brokerConfig());
+            broker.start();
+            brokers.add(broker);
+        }
+        admin = brokers.get(0).getAdminClient();
+        admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+        admin.tenants().createTenant("public", TenantInfo.builder()
+                .allowedClusters(Collections.singleton(clusterName)).build());
+        admin.namespaces().createNamespace("public/default");
+    }
+
+
+    @AfterClass(alwaysRun = true, timeOut = 30000)
+    public void cleanup() throws Exception {
+        bk.stop();
+    }
+
+    private ServiceConfiguration brokerConfig() {
+        final var config = new ServiceConfiguration();
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
+        config.setManagedLedgerDefaultWriteQuorum(1);
+        config.setManagedLedgerDefaultAckQuorum(1);
+        config.setManagedLedgerDefaultEnsembleSize(1);
+        config.setDefaultNumberOfNamespaceBundles(16);
+        config.setLoadBalancerAutoBundleSplitEnabled(false);
+        
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        config.setLoadBalancerDebugModeEnabled(true);
+        config.setBrokerShutdownTimeoutMs(100);
+        return config;
+    }
+
+
+    @Test
+    public void testCloseAfterLoadingBundles() throws Exception {
+        final var topic = "test";
+        admin.topics().createPartitionedTopic(topic, 20);
+        admin.lookups().lookupPartitionedTopic(topic);
+        final var client = 
PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build();
+        final var producer = client.newProducer().topic(topic).create();
+        producer.close();
+        client.close();
+
+        final var closeTimeMsList = new ArrayList<Long>();
+        for (var broker : brokers) {
+            final var startTimeMs = System.currentTimeMillis();
+            broker.close();
+            closeTimeMsList.add(System.currentTimeMillis() - startTimeMs);
+        }
+        log.info("Brokers close time: {}", closeTimeMsList);
+        for (var closeTimeMs : closeTimeMsList) {
+            Assert.assertTrue(closeTimeMs < 5000L);
+        }
+    }
+}

Reply via email to