This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d9c51322ce5566cc706e00dfe12b4976396656f5 Author: Heesung Sohn <[email protected]> AuthorDate: Tue Jun 18 08:43:40 2024 -0700 [fix][broker] Update init and shutdown time and other minor logic (ExtensibleLoadManagerImpl only) (#22930) (cherry picked from commit aa8f696b8e17a49d1a7ff6cdc25f1d86e7c4a8ed) (cherry picked from commit 9b6156ab452756ad8356f94793c785901c69de8e) --- .../apache/pulsar/PulsarClusterMetadataSetup.java | 4 ++-- .../extensions/ExtensibleLoadManagerImpl.java | 6 ++++-- .../channel/ServiceUnitStateChannelImpl.java | 23 +++++++++++++++------- .../store/TableViewLoadDataStoreImpl.java | 12 ++++++----- .../pulsar/broker/namespace/NamespaceService.java | 5 +++-- 5 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 9b757c55ccd..ceeb4e2ef90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -373,8 +373,8 @@ public class PulsarClusterMetadataSetup { } } - static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, - String cluster, int bundleNumber) throws IOException { + public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName, + String cluster, int bundleNumber) throws IOException { NamespaceResources namespaceResources = resources.getNamespaceResources(); if (!namespaceResources.namespaceExists(namespaceName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index fd4e94ba777..1c295fe0561 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -624,7 +624,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle, Optional<String> destinationBroker, - boolean force) { + boolean force, + long timeout, + TimeUnit timeoutUnit) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -647,7 +649,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + timeout, timeoutUnit); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 6c3c5f35dce..0feed5e8333 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -115,7 +115,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; - private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60; public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins @@ -305,7 +304,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } } PulsarClusterMetadataSetup.createNamespaceIfAbsent - (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName()); + (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(), + config.getDefaultNumberOfNamespaceBundles()); ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); @@ -1315,11 +1315,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } } if (cleaned) { - try { - MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS); - } catch (InterruptedException e) { - log.warn("Interrupted while gracefully waiting for the cleanup convergence."); - } break; } else { try { @@ -1330,9 +1325,23 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } } } + log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId, + System.currentTimeMillis() - started); } private synchronized void doCleanup(String broker) { + try { + if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) + .isEmpty()) { + log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup", + broker); + return; + } + } catch (Exception e) { + log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker); + return; + } + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index 81cf33b4a55..e9289d3ccda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -31,7 +31,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; @@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.TableView; public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2; + private static final long INIT_TIMEOUT_IN_SECS = 5; private volatile TableView<T> tableView; private volatile long tableViewLastUpdateTimestamp; @@ -123,10 +123,11 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { public synchronized void startTableView() throws LoadDataStoreException { if (tableView == null) { try { - tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create(); + tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); tableView.forEachAndListen((k, v) -> tableViewLastUpdateTimestamp = System.currentTimeMillis()); - } catch (PulsarClientException e) { + } catch (Exception e) { tableView = null; throw new LoadDataStoreException(e); } @@ -137,8 +138,9 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { public synchronized void startProducer() throws LoadDataStoreException { if (producer == null) { try { - producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); - } catch (PulsarClientException e) { + producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync() + .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS); + } catch (Exception e) { producer = null; throw new LoadDataStoreException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 18290f526c5..d8bd2bb3411 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -788,7 +788,7 @@ public class NamespaceService implements AutoCloseable { boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker, false); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -1237,7 +1237,8 @@ public class NamespaceService implements AutoCloseable { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); future = extensibleLoadManager.unloadNamespaceBundleAsync( - nsBundle, Optional.empty(), true); + nsBundle, Optional.empty(), true, + pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); } else { future = ownershipCache.removeOwnership(nsBundle); }
