This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c834feb4593 [improve][broker] PIP-307: Add feature flag config option 
(#21866)
c834feb4593 is described below

commit c834feb459369f497c68cd42dbc1625b11551f72
Author: Dragos Misca <[email protected]>
AuthorDate: Thu Jan 11 09:14:45 2024 -0800

    [improve][broker] PIP-307: Add feature flag config option (#21866)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++
 .../extensions/ExtensibleLoadManagerImpl.java      |  7 +-
 .../channel/ServiceUnitStateChannelImpl.java       | 13 ++--
 .../extensions/ExtensibleLoadManagerImplTest.java  | 75 ++++++++++++++++++++++
 4 files changed, 97 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cfb66a7df78..4aed15922f0 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2689,6 +2689,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private long loadBalancerServiceUnitStateMonitorIntervalInSeconds = 60;
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Enables the multi-phase unloading of bundles. Set to true, 
forwards destination broker information "
+                    + "to consumers and producers during bundle unload, 
allowing them to quickly reconnect to the "
+                    + "broker without performing an additional topic lookup."
+    )
+    private boolean loadBalancerMultiPhaseBundleUnload = true;
+
     /**** --- Replication. --- ****/
     @FieldContext(
         category = CATEGORY_REPLICATION,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4664a840c92..06ece6ca641 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -315,11 +315,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
      * Gets the assigned broker for the given topic.
      * @param pulsar PulsarService instance
      * @param topic Topic Name
-     * @return the assigned broker's BrokerLookupData instance. Empty, if not 
assigned by Extensible LoadManager.
+     * @return the assigned broker's BrokerLookupData instance. Empty, if not 
assigned by Extensible LoadManager or the
+     *         optimized bundle unload process is disabled.
      */
     public static CompletableFuture<Optional<BrokerLookupData>> 
getAssignedBrokerLookupData(PulsarService pulsar,
                                                                           
String topic) {
-        if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+        var config = pulsar.getConfig();
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+                && config.isLoadBalancerMultiPhaseBundleUnload()) {
             var topicName = TopicName.get(topic);
             try {
                 return pulsar.getNamespaceService().getBundleAsync(topicName)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 118667d2575..3e44771071f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -778,9 +778,12 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             lastOwnEventHandledAt = System.currentTimeMillis();
             stateChangeListeners.notify(serviceUnit, data, null);
             log(null, serviceUnit, data, null);
-        } else if ((data.force() || isTransferCommand(data)) && 
isTargetBroker(data.sourceBroker())) {
-            stateChangeListeners.notifyOnCompletion(
-                            closeServiceUnit(serviceUnit, true), serviceUnit, 
data)
+        } else if (isTargetBroker(data.sourceBroker())) {
+            var isOrphanCleanup = data.force();
+            var isTransfer = isTransferCommand(data) && 
pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
+            var future = isOrphanCleanup || isTransfer
+                    ? closeServiceUnit(serviceUnit, true) : 
CompletableFuture.completedFuture(null);
+            stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
         } else {
             stateChangeListeners.notify(serviceUnit, data, null);
@@ -803,7 +806,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             if (isTransferCommand(data)) {
                 next = new ServiceUnitStateData(
                         Assigning, data.dstBroker(), data.sourceBroker(), 
getNextVersionId(data));
-                unloadFuture = closeServiceUnit(serviceUnit, false);
+                // If the optimized bundle unload is disabled, disconnect the 
clients at time of RELEASE.
+                var disconnectClients = 
!pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload();
+                unloadFuture = closeServiceUnit(serviceUnit, 
disconnectClients);
             } else {
                 next = new ServiceUnitStateData(
                         Free, null, data.sourceBroker(), 
getNextVersionId(data));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 11d1ef900da..1f25828af2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -224,6 +224,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().unload(defaultTestNamespace);
         reset(primaryLoadManager, secondaryLoadManager);
         FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
+        pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+        pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
     }
 
     @Test
@@ -654,6 +656,79 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @DataProvider(name = "isPersistentTopicTest")
+    public Object[][] isPersistentTopicTest() {
+        return new Object[][]{{TopicDomain.persistent}, 
{TopicDomain.non_persistent}};
+    }
+
+    @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
+    public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws 
Exception {
+        var id = String.format("test-optimize-unload-disable-%s-%s", 
topicDomain, UUID.randomUUID());
+        var topic = String.format("%s://%s/%s", topicDomain, 
defaultTestNamespace, id);
+        var topicName = TopicName.get(topic);
+
+        pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);
+        pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);
+
+        @Cleanup
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        @Cleanup
+        var consumer = 
pulsarClient.newConsumer(Schema.STRING).subscriptionName(id).topic(topic).subscribe();
+
+        Awaitility.await().until(() -> producer.isConnected() && 
consumer.isConnected());
+
+        var lookup = spyLookupService(pulsarClient);
+
+        final CountDownLatch cdl = new CountDownLatch(3);
+
+        NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
+        var srcBrokerServiceUrl = admin.lookups().lookupTopic(topic);
+        var dstBroker = 
srcBrokerServiceUrl.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1;
+
+        CompletableFuture<Void> unloadNamespaceBundle = 
CompletableFuture.runAsync(() -> {
+            try {
+                cdl.await();
+                admin.namespaces().unloadNamespaceBundle(defaultTestNamespace, 
bundle.getBundleRange(),
+                        dstBroker.getLookupServiceAddress());
+            } catch (InterruptedException | PulsarAdminException e) {
+                fail();
+            }
+        });
+
+        MutableInt sendCount = new MutableInt();
+        Awaitility.await().atMost(20, 
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+            var message = String.format("message-%d", sendCount.getValue());
+
+            boolean messageSent = false;
+            while (true) {
+                var recvFuture = consumer.receiveAsync().orTimeout(1000, 
TimeUnit.MILLISECONDS);
+
+                if (!messageSent) {
+                    producer.send(message);
+                    messageSent = true;
+                }
+
+                if (topicDomain == TopicDomain.non_persistent) {
+                    // No need to wait for message receipt, we're only trying 
to stress the consumer lookup pathway.
+                    break;
+                }
+                var msg = recvFuture.get();
+                if (Objects.equals(msg.getValue(), message)) {
+                    break;
+                }
+            }
+
+            cdl.countDown();
+            return sendCount.incrementAndGet() == 10;
+        });
+
+        assertTrue(producer.isConnected());
+        assertTrue(consumer.isConnected());
+        assertTrue(unloadNamespaceBundle.isDone());
+        verify(lookup, times(2)).getBroker(topicName);
+    }
+
     private LookupService spyLookupService(PulsarClient client) throws 
IllegalAccessException {
         LookupService svc = (LookupService) 
FieldUtils.readDeclaredField(client, "lookup", true);
         var lookup = spy(svc);

Reply via email to