This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa8f696b8e1 [fix][broker] Update init and shutdown time and other
minor logic (ExtensibleLoadManagerImpl only) (#22930)
aa8f696b8e1 is described below
commit aa8f696b8e17a49d1a7ff6cdc25f1d86e7c4a8ed
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Jun 18 08:43:40 2024 -0700
[fix][broker] Update init and shutdown time and other minor logic
(ExtensibleLoadManagerImpl only) (#22930)
---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 4 ++--
.../extensions/ExtensibleLoadManagerImpl.java | 6 +++--
.../channel/ServiceUnitStateChannelImpl.java | 26 ++++++++++++++++------
.../store/TableViewLoadDataStoreImpl.java | 12 +++++-----
.../pulsar/broker/namespace/NamespaceService.java | 5 +++--
.../protocol/PulsarClientBasedHandlerTest.java | 3 +--
6 files changed, 36 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index d5b8df43a47..04a66ff022e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -375,8 +375,8 @@ public class PulsarClusterMetadataSetup {
}
}
- static void createNamespaceIfAbsent(PulsarResources resources,
NamespaceName namespaceName,
- String cluster, int bundleNumber) throws IOException {
+ public static void createNamespaceIfAbsent(PulsarResources resources,
NamespaceName namespaceName,
+ String cluster, int
bundleNumber) throws IOException {
NamespaceResources namespaceResources =
resources.getNamespaceResources();
if (!namespaceResources.namespaceExists(namespaceName)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 1e519b3284f..92dcf8001ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -668,7 +668,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId
bundle,
Optional<String>
destinationBroker,
- boolean force) {
+ boolean force,
+ long timeout,
+ TimeUnit
timeoutUnit) {
if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
@@ -691,7 +693,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
UnloadDecision unloadDecision =
new UnloadDecision(unload,
UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
- conf.getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS);
+ timeout, timeoutUnit);
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f04734c4ad9..1688a892e23 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -114,7 +114,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE =
CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS =
100;
- public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS =
3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3
mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs
to clean immediately
@@ -298,7 +297,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
(pulsar.getPulsarResources(),
SYSTEM_NAMESPACE.getTenant(), config.getClusterName());
PulsarClusterMetadataSetup.createNamespaceIfAbsent
- (pulsar.getPulsarResources(), SYSTEM_NAMESPACE,
config.getClusterName());
+ (pulsar.getPulsarResources(), SYSTEM_NAMESPACE,
config.getClusterName(),
+ config.getDefaultNumberOfNamespaceBundles());
ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
@@ -1018,6 +1018,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (ex != null) {
log.error("Failed to close topics under bundle:{} in
{} ms",
bundle.toString(), unloadBundleTime, ex);
+ if (!disconnectClients) {
+
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
+ }
} else {
log.info("Unloading bundle:{} with {} topics completed
in {} ms",
bundle, unloadedTopics, unloadBundleTime);
@@ -1342,11 +1345,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
if (cleaned) {
- try {
-
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
- } catch (InterruptedException e) {
- log.warn("Interrupted while gracefully waiting for the
cleanup convergence.");
- }
break;
} else {
try {
@@ -1357,9 +1355,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
}
+ log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {}
ms", brokerId,
+ System.currentTimeMillis() - started);
}
private synchronized void doCleanup(String broker) {
+ try {
+ if
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS,
TimeUnit.SECONDS)
+ .isEmpty()) {
+ log.error("Found the channel owner is empty. Skip the inactive
broker:{}'s orphan bundle cleanup",
+ broker);
+ return;
+ }
+ } catch (Exception e) {
+ log.error("Failed to find the channel owner. Skip the inactive
broker:{}'s orphan bundle cleanup", broker);
+ return;
+ }
+
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 81cf33b4a55..e9289d3ccda 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.TableView;
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
private static final long
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+ private static final long INIT_TIMEOUT_IN_SECS = 5;
private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
@@ -123,10 +123,11 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
- tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+ tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
+ .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp =
System.currentTimeMillis());
- } catch (PulsarClientException e) {
+ } catch (Exception e) {
tableView = null;
throw new LoadDataStoreException(e);
}
@@ -137,8 +138,9 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
- producer =
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
- } catch (PulsarClientException e) {
+ producer =
client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
+ .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+ } catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9df2b09204c..df6a141ddcf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -840,7 +840,7 @@ public class NamespaceService implements AutoCloseable {
boolean
closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
- .unloadNamespaceBundleAsync(bundle, destinationBroker,
false);
+ .unloadNamespaceBundleAsync(bundle, destinationBroker,
false, timeout, timeoutUnit);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
@@ -1290,7 +1290,8 @@ public class NamespaceService implements AutoCloseable {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
- nsBundle, Optional.empty(), true);
+ nsBundle, Optional.empty(), true,
+ pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
index 9cc20cf7b9d..bdaddf9afb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
-import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -73,7 +72,7 @@ public class PulsarClientBasedHandlerTest {
pulsar.close();
final var elapsedMs = System.currentTimeMillis() - beforeStop;
log.info("It spends {} ms to stop the broker ({} for protocol
handler)", elapsedMs, handler.closeTimeMs);
- Assert.assertTrue(elapsedMs <
ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
+ Assert.assertTrue(elapsedMs <
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate
1 more second for other processes
}