This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c834feb4593 [improve][broker] PIP-307: Add feature flag config option
(#21866)
c834feb4593 is described below
commit c834feb459369f497c68cd42dbc1625b11551f72
Author: Dragos Misca <[email protected]>
AuthorDate: Thu Jan 11 09:14:45 2024 -0800
[improve][broker] PIP-307: Add feature flag config option (#21866)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++
.../extensions/ExtensibleLoadManagerImpl.java | 7 +-
.../channel/ServiceUnitStateChannelImpl.java | 13 ++--
.../extensions/ExtensibleLoadManagerImplTest.java | 75 ++++++++++++++++++++++
4 files changed, 97 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cfb66a7df78..4aed15922f0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2689,6 +2689,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long loadBalancerServiceUnitStateMonitorIntervalInSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enables the multi-phase unloading of bundles. Set to true,
forwards destination broker information "
+ + "to consumers and producers during bundle unload,
allowing them to quickly reconnect to the "
+ + "broker without performing an additional topic lookup."
+ )
+ private boolean loadBalancerMultiPhaseBundleUnload = true;
+
/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4664a840c92..06ece6ca641 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -315,11 +315,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
* Gets the assigned broker for the given topic.
* @param pulsar PulsarService instance
* @param topic Topic Name
- * @return the assigned broker's BrokerLookupData instance. Empty, if not
assigned by Extensible LoadManager.
+ * @return the assigned broker's BrokerLookupData instance. Empty, if not
assigned by Extensible LoadManager or the
+ * optimized bundle unload process is disabled.
*/
public static CompletableFuture<Optional<BrokerLookupData>>
getAssignedBrokerLookupData(PulsarService pulsar,
String topic) {
- if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+ var config = pulsar.getConfig();
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+ && config.isLoadBalancerMultiPhaseBundleUnload()) {
var topicName = TopicName.get(topic);
try {
return pulsar.getNamespaceService().getBundleAsync(topicName)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 118667d2575..3e44771071f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -778,9 +778,12 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
lastOwnEventHandledAt = System.currentTimeMillis();
stateChangeListeners.notify(serviceUnit, data, null);
log(null, serviceUnit, data, null);
- } else if ((data.force() || isTransferCommand(data)) &&
isTargetBroker(data.sourceBroker())) {
- stateChangeListeners.notifyOnCompletion(
- closeServiceUnit(serviceUnit, true), serviceUnit,
data)
+ } else if (isTargetBroker(data.sourceBroker())) {
+ var isOrphanCleanup = data.force();
+ var isTransfer = isTransferCommand(data) &&
pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
+ var future = isOrphanCleanup || isTransfer
+ ? closeServiceUnit(serviceUnit, true) :
CompletableFuture.completedFuture(null);
+ stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
@@ -803,7 +806,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(),
getNextVersionId(data));
- unloadFuture = closeServiceUnit(serviceUnit, false);
+ // If the optimized bundle unload is disabled, disconnect the
clients at time of RELEASE.
+ var disconnectClients =
!pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
+ unloadFuture = closeServiceUnit(serviceUnit,
disconnectClients);
} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(),
getNextVersionId(data));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 11d1ef900da..1f25828af2b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -224,6 +224,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().unload(defaultTestNamespace);
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
+ pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+ pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
}
@Test
@@ -654,6 +656,79 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
}
}
+ @DataProvider(name = "isPersistentTopicTest")
+ public Object[][] isPersistentTopicTest() {
+ return new Object[][]{{TopicDomain.persistent},
{TopicDomain.non_persistent}};
+ }
+
+ @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+ public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws
Exception {
+ var id = String.format("test-optimize-unload-disable-%s-%s",
topicDomain, UUID.randomUUID());
+ var topic = String.format("%s://%s/%s", topicDomain,
defaultTestNamespace, id);
+ var topicName = TopicName.get(topic);
+
+ pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);
+ pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);
+
+ @Cleanup
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ @Cleanup
+ var consumer =
pulsarClient.newConsumer(Schema.STRING).subscriptionName(id).topic(topic).subscribe();
+
+ Awaitility.await().until(() -> producer.isConnected() &&
consumer.isConnected());
+
+ var lookup = spyLookupService(pulsarClient);
+
+ final CountDownLatch cdl = new CountDownLatch(3);
+
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
+ var srcBrokerServiceUrl = admin.lookups().lookupTopic(topic);
+ var dstBroker =
srcBrokerServiceUrl.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1;
+
+ CompletableFuture<Void> unloadNamespaceBundle =
CompletableFuture.runAsync(() -> {
+ try {
+ cdl.await();
+ admin.namespaces().unloadNamespaceBundle(defaultTestNamespace,
bundle.getBundleRange(),
+ dstBroker.getLookupServiceAddress());
+ } catch (InterruptedException | PulsarAdminException e) {
+ fail();
+ }
+ });
+
+ MutableInt sendCount = new MutableInt();
+ Awaitility.await().atMost(20,
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+ var message = String.format("message-%d", sendCount.getValue());
+
+ boolean messageSent = false;
+ while (true) {
+ var recvFuture = consumer.receiveAsync().orTimeout(1000,
TimeUnit.MILLISECONDS);
+
+ if (!messageSent) {
+ producer.send(message);
+ messageSent = true;
+ }
+
+ if (topicDomain == TopicDomain.non_persistent) {
+ // No need to wait for message receipt, we're only trying
to stress the consumer lookup pathway.
+ break;
+ }
+ var msg = recvFuture.get();
+ if (Objects.equals(msg.getValue(), message)) {
+ break;
+ }
+ }
+
+ cdl.countDown();
+ return sendCount.incrementAndGet() == 10;
+ });
+
+ assertTrue(producer.isConnected());
+ assertTrue(consumer.isConnected());
+ assertTrue(unloadNamespaceBundle.isDone());
+ verify(lookup, times(2)).getBroker(topicName);
+ }
+
private LookupService spyLookupService(PulsarClient client) throws
IllegalAccessException {
LookupService svc = (LookupService)
FieldUtils.readDeclaredField(client, "lookup", true);
var lookup = spy(svc);