This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77fd54e6a59749201aeb462e5c20256e75cd1776
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jun 18 08:43:40 2024 -0700

    [fix][broker] Update init and shutdown time and other minor logic 
(ExtensibleLoadManagerImpl only) (#22930)
    
    (cherry picked from commit aa8f696b8e17a49d1a7ff6cdc25f1d86e7c4a8ed)
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |  4 ++--
 .../extensions/ExtensibleLoadManagerImpl.java      |  6 +++--
 .../channel/ServiceUnitStateChannelImpl.java       | 26 ++++++++++++++++------
 .../store/TableViewLoadDataStoreImpl.java          | 12 +++++-----
 .../pulsar/broker/namespace/NamespaceService.java  |  5 +++--
 .../protocol/PulsarClientBasedHandlerTest.java     |  3 +--
 6 files changed, 36 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 794b5399c99..1083fc8737d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -373,8 +373,8 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
-    static void createNamespaceIfAbsent(PulsarResources resources, 
NamespaceName namespaceName,
-            String cluster, int bundleNumber) throws IOException {
+    public static void createNamespaceIfAbsent(PulsarResources resources, 
NamespaceName namespaceName,
+                                               String cluster, int 
bundleNumber) throws IOException {
         NamespaceResources namespaceResources = 
resources.getNamespaceResources();
 
         if (!namespaceResources.namespaceExists(namespaceName)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index ad5e539ed33..cba9bd22c03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -669,7 +669,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId 
bundle,
                                                               Optional<String> 
destinationBroker,
-                                                              boolean force) {
+                                                              boolean force,
+                                                              long timeout,
+                                                              TimeUnit 
timeoutUnit) {
         if 
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
 {
             log.info("Skip unloading namespace bundle: {}.", bundle);
             return CompletableFuture.completedFuture(null);
@@ -692,7 +694,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                     UnloadDecision unloadDecision =
                             new UnloadDecision(unload, 
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
                     return unloadAsync(unloadDecision,
-                            conf.getNamespaceBundleUnloadingTimeoutMs(), 
TimeUnit.MILLISECONDS);
+                            timeout, timeoutUnit);
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index fc806adc0d6..25173f61c06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -113,7 +113,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     public static final CompressionType MSG_COMPRESSION_TYPE = 
CompressionType.ZSTD;
     private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
     private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 
100;
-    public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 
3000;
     public static final long VERSION_ID_INIT = 1; // initial versionId
     public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 
mins
     private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs 
to clean immediately
@@ -298,7 +297,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     (pulsar.getPulsarResources(), 
SYSTEM_NAMESPACE.getTenant(), config.getClusterName());
 
             PulsarClusterMetadataSetup.createNamespaceIfAbsent
-                    (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, 
config.getClusterName());
+                    (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, 
config.getClusterName(),
+                            config.getDefaultNumberOfNamespaceBundles());
 
             ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
 
@@ -1013,6 +1013,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     if (ex != null) {
                         log.error("Failed to close topics under bundle:{} in 
{} ms",
                                 bundle.toString(), unloadBundleTime, ex);
+                        if (!disconnectClients) {
+                            
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
+                        }
                     } else {
                         log.info("Unloading bundle:{} with {} topics completed 
in {} ms",
                                 bundle, unloadedTopics, unloadBundleTime);
@@ -1355,11 +1358,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 }
             }
             if (cleaned) {
-                try {
-                    
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
-                } catch (InterruptedException e) {
-                    log.warn("Interrupted while gracefully waiting for the 
cleanup convergence.");
-                }
                 break;
             } else {
                 try {
@@ -1370,9 +1368,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 }
             }
         }
+        log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} 
ms", brokerId,
+                System.currentTimeMillis() - started);
     }
 
     private synchronized void doCleanup(String broker)  {
+        try {
+            if 
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, 
TimeUnit.SECONDS)
+                    .isEmpty()) {
+                log.error("Found the channel owner is empty. Skip the inactive 
broker:{}'s orphan bundle cleanup",
+                        broker);
+                return;
+            }
+        } catch (Exception e) {
+            log.error("Failed to find the channel owner. Skip the inactive 
broker:{}'s orphan bundle cleanup", broker);
+            return;
+        }
+
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 81cf33b4a55..e9289d3ccda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.TableView;
 public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
 
     private static final long 
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+    private static final long INIT_TIMEOUT_IN_SECS = 5;
 
     private volatile TableView<T> tableView;
     private volatile long tableViewLastUpdateTimestamp;
@@ -123,10 +123,11 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
     public synchronized void startTableView() throws LoadDataStoreException {
         if (tableView == null) {
             try {
-                tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+                tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
+                        .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
                 tableView.forEachAndListen((k, v) ->
                         tableViewLastUpdateTimestamp = 
System.currentTimeMillis());
-            } catch (PulsarClientException e) {
+            } catch (Exception e) {
                 tableView = null;
                 throw new LoadDataStoreException(e);
             }
@@ -137,8 +138,9 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
     public synchronized void startProducer() throws LoadDataStoreException {
         if (producer == null) {
             try {
-                producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
-            } catch (PulsarClientException e) {
+                producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
+                        .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+            } catch (Exception e) {
                 producer = null;
                 throw new LoadDataStoreException(e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 16c8c4c702f..528f7cd9aa7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -783,7 +783,7 @@ public class NamespaceService implements AutoCloseable {
                                                          boolean 
closeWithoutWaitingClientDisconnect) {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return ExtensibleLoadManagerImpl.get(loadManager.get())
-                    .unloadNamespaceBundleAsync(bundle, destinationBroker, 
false);
+                    .unloadNamespaceBundleAsync(bundle, destinationBroker, 
false, timeout, timeoutUnit);
         }
         // unload namespace bundle
         OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
@@ -1233,7 +1233,8 @@ public class NamespaceService implements AutoCloseable {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
             future = extensibleLoadManager.unloadNamespaceBundleAsync(
-                    nsBundle, Optional.empty(), true);
+                    nsBundle, Optional.empty(), true,
+                    pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), 
TimeUnit.MILLISECONDS);
         } else {
             future = ownershipCache.removeOwnership(nsBundle);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
index 9cc20cf7b9d..bdaddf9afb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
-import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -73,7 +72,7 @@ public class PulsarClientBasedHandlerTest {
         pulsar.close();
         final var elapsedMs = System.currentTimeMillis() - beforeStop;
         log.info("It spends {} ms to stop the broker ({} for protocol 
handler)", elapsedMs, handler.closeTimeMs);
-        Assert.assertTrue(elapsedMs < 
ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
+        Assert.assertTrue(elapsedMs <
                 + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 
1 more second for other processes
     }
 

Reply via email to