This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 22115c39454 [improve][broker] Gracefully shut down load balancer 
extension (#20315)
22115c39454 is described below

commit 22115c394541e4f6f4890d429bcca455ebd0d1ec
Author: Heesung Sohn <[email protected]>
AuthorDate: Sun May 21 17:50:44 2023 -0700

    [improve][broker] Gracefully shut down load balancer extension (#20315)
    
    ### Motivation
    
    Load balancer extension needs to shut down gracefully, especially when 
shutting down the leader broker. When the leader broker closes the leader 
election service too late, service unit lookups to the leader broker could fail 
during the shutdown. This could delay client reconnection time.
    
    Lookup failure logs
    ```
    (shutdown starts)
    pulsar-broker-8 pulsar-broker 2023-04-22T00:19:52,630+0000 
[pulsar-service-shutdown] INFO  org.apache.pulsar.broker.PulsarService - 
Closing PulsarService
    pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,690+0000 [pulsar-io-4-5] 
INFO  org.apache.pulsar.client.impl.ConnectionHandler - 
[persistent://pulsar/system/loadbalancer-service-unit-state] [pulsar-18-9] 
Closed connection [id: 0x907e74b0, L:/10.0.13.6:35838 ! 
R:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local/10.0.18.18:6650] -- 
Will try again in 0.1 s
    pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,691+0000 [pulsar-io-4-5] 
INFO  org.apache.pulsar.client.impl.ConnectionHandler - 
[persistent://pulsar/system/loadbalancer-service-unit-state] 
[reader-30e6b40dd9] Closed connection [id: 0x907e74b0, L:/10.0.13.6:35838 ! 
R:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local/10.0.18.18:6650] -- 
Will try again in 0.1 s
    
    (znode is gone)
    pulsar-broker-8 pulsar-broker 2023-04-22T00:19:52,652+0000 
[pulsar-load-manager-1-1] INFO  
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl
 - BrokerRegistry detected the 
broker:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 registry has 
been deleted.
    
    (lookup failure)
    org.apache.pulsar.client.impl.ConnectionHandler - 
[persistent://pulsar/system/loadbalancer-service-unit-state] 
[reader-30e6b40dd9] Could not get connection to broker: 
org.apache.pulsar.client.api.PulsarClientException$ConnectException: 
Disconnected from server at 
pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local/10.0.13.6:6650 -- Will 
try again in 0.193 s
    pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,827+0000 
[main-EventThread] INFO  org.apache.pulsar.broker.lookup.TopicLookupBase - 
Failed to lookup null for topic 
persistent://pulsar/system/loadbalancer-service-unit-state with error Failed to 
look up a broker 
registry:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 for 
bundle:pulsar/system/0x40000000_0x50000000
    pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,827+0000 
[main-EventThread] INFO  org.apache.pulsar.broker.lookup.TopicLookupBase - 
Failed to lookup null for topic 
persistent://pulsar/system/loadbalancer-service-unit-state with error Failed to 
look up a broker 
registry:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 for 
bundle:pulsar/system/0x40000000_0x50000000
    
    (leader election service has been closed)
    pulsar-broker-3 pulsar-broker 2023-04-22T00:19:55,569+0000 
[pulsar-load-manager-1-1] INFO  
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl - 
This broker:pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:8080 plays 
the leader now.
    
    ```
    
    ### Modifications
    During the shutdown flow, when calling loadbalancer.disableBroker(), the 
load balancer extension gracefully gives up the owned bundles to other brokers. 
After that, it closes the leader election service and removes its register in 
the metadata store.
---
 .../org/apache/pulsar/broker/PulsarService.java    |  22 +++-
 .../extensions/ExtensibleLoadManagerImpl.java      |  17 ++++
 .../extensions/ExtensibleLoadManagerWrapper.java   |   2 +-
 .../channel/ServiceUnitStateChannel.java           |   5 +
 .../channel/ServiceUnitStateChannelImpl.java       | 112 +++++++++++++++++----
 .../extensions/ExtensibleLoadManagerImplTest.java  |  45 +++++++++
 .../channel/ServiceUnitStateChannelTest.java       |  65 ++++++++----
 7 files changed, 222 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f833674ceeb..62d4634fa2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -382,6 +382,17 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         localMetadataStore.close();
     }
 
+    private void closeLeaderElectionService() throws Exception {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
+        } else {
+            if (this.leaderElectionService != null) {
+                this.leaderElectionService.close();
+                this.leaderElectionService = null;
+            }
+        }
+    }
+
     @Override
     public void close() throws PulsarServerException {
         try {
@@ -502,10 +513,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 this.bkClientFactory = null;
             }
 
-            if (this.leaderElectionService != null) {
-                this.leaderElectionService.close();
-                this.leaderElectionService = null;
-            }
+            closeLeaderElectionService();
 
             if (adminClient != null) {
                 adminClient.close();
@@ -1316,7 +1324,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
      * @return a reference of the current <code>LeaderElectionService</code> 
instance.
      */
     public LeaderElectionService getLeaderElectionService() {
-        return this.leaderElectionService;
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return 
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
+        } else {
+            return this.leaderElectionService;
+        }
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 348098df874..531ab18938a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -26,6 +26,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecisi
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -397,12 +398,22 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     }
 
     public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle) {
+        return selectAsync(bundle, Collections.emptySet());
+    }
+
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle,
+                                                           Set<String> 
excludeBrokerSet) {
         BrokerRegistry brokerRegistry = getBrokerRegistry();
         return brokerRegistry.getAvailableBrokerLookupDataAsync()
                 .thenCompose(availableBrokers -> {
                     LoadManagerContext context = this.getContext();
 
                     Map<String, BrokerLookupData> availableBrokerCandidates = 
new HashMap<>(availableBrokers);
+                    if (!excludeBrokerSet.isEmpty()) {
+                        for (String exclude : excludeBrokerSet) {
+                            availableBrokerCandidates.remove(exclude);
+                        }
+                    }
 
                     // Filter out brokers that do not meet the rules.
                     List<BrokerFilter> filterPipeline = 
getBrokerFilterPipeline();
@@ -702,4 +713,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             log.error("Failed to get the channel ownership.", e);
         }
     }
+
+    public void disableBroker() throws Exception {
+        serviceUnitStateChannel.cleanOwnerships();
+        leaderElectionService.close();
+        brokerRegistry.unregister();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 1eabbe620e2..18e949537de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -74,7 +74,7 @@ public class ExtensibleLoadManagerWrapper implements 
LoadManager {
 
     @Override
     public void disableBroker() throws Exception {
-        this.loadManager.getBrokerRegistry().unregister();
+        this.loadManager.disableBroker();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 4782a31fe0c..6e75fe91a91 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable {
      * Cancels the ownership monitor.
      */
     void cancelOwnershipMonitor();
+
+    /**
+     * Cleans the service unit ownerships from the current broker's channel.
+     */
+    void cleanOwnerships();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f476675d01b..489a0085105 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -91,7 +91,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
@@ -110,6 +109,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     public static final CompressionType MSG_COMPRESSION_TYPE = 
CompressionType.ZSTD;
     private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 
* 1000; // 30sec
+
+    private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
+    private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 
100;
+    private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 
3000;
     public static final long VERSION_ID_INIT = 1; // initial versionId
     private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
     public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 
mins
@@ -257,6 +260,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    @Override
+    public void cleanOwnerships() {
+        doCleanup(lookupServiceAddress);
+    }
+
     public synchronized void start() throws PulsarServerException {
         if (!validateChannelState(LeaderElectionServiceStarted, false)) {
             throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
@@ -284,7 +292,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 }
             }
             PulsarClusterMetadataSetup.createNamespaceIfAbsent
-                    (pulsar.getPulsarResources(), 
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
+                    (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, 
config.getClusterName());
 
             ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
 
@@ -696,6 +704,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (isTargetBroker(data.dstBroker())) {
             log(null, serviceUnit, data, null);
             lastOwnEventHandledAt = System.currentTimeMillis();
+        } else if (data.force() && isTargetBroker(data.sourceBroker())) {
+            closeServiceUnit(serviceUnit);
         }
     }
 
@@ -1110,21 +1120,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
 
     private ServiceUnitStateData 
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
-                                                                    String 
selectedBroker) {
+                                                                    String 
selectedBroker,
+                                                                    String 
inactiveBroker) {
         if (orphanData.state() == Splitting) {
             return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), 
selectedBroker,
                     Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
                     true, getNextVersionId(orphanData));
         } else {
-            return new ServiceUnitStateData(Owned, selectedBroker, true, 
getNextVersionId(orphanData));
+            return new ServiceUnitStateData(Owned, selectedBroker, 
inactiveBroker,
+                    true, getNextVersionId(orphanData));
         }
     }
 
-    private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData) {
-
-        Optional<String> selectedBroker = selectBroker(serviceUnit);
+    private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData, String inactiveBroker) {
+        Optional<String> selectedBroker = selectBroker(serviceUnit, 
inactiveBroker);
         if (selectedBroker.isPresent()) {
-            var override = getOverrideInactiveBrokerStateData(orphanData, 
selectedBroker.get());
+            var override = getOverrideInactiveBrokerStateData(
+                    orphanData, selectedBroker.get(), inactiveBroker);
             log.info("Overriding ownership serviceUnit:{} from orphanData:{} 
to overrideData:{}",
                     serviceUnit, orphanData, override);
             publishOverrideEventAsync(serviceUnit, orphanData, override)
@@ -1142,26 +1154,69 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    private void waitForCleanups(String broker, boolean excludeSystemTopics, 
int maxWaitTimeInMillis) {
+        long started = System.currentTimeMillis();
+        while (System.currentTimeMillis() - started < maxWaitTimeInMillis) {
+            boolean cleaned = true;
+            for (var etr : tableview.entrySet()) {
+                var serviceUnit = etr.getKey();
+                var data = etr.getValue();
+
+                if (excludeSystemTopics && 
serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+                    continue;
+                }
+
+                if (data.state() == Owned && broker.equals(data.dstBroker())) {
+                    cleaned = false;
+                    break;
+                }
+            }
+            if (cleaned) {
+                try {
+                    
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted while gracefully waiting for the 
cleanup convergence.");
+                }
+                break;
+            } else {
+                try {
+                    
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted while delaying the next service unit 
clean-up. Cleaning broker:{}",
+                            lookupServiceAddress);
+                }
+            }
+        }
+    }
 
-    private void doCleanup(String broker)  {
+    private synchronized void doCleanup(String broker)  {
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
 
+        Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new 
HashMap<>();
         for (var etr : tableview.entrySet()) {
             var stateData = etr.getValue();
             var serviceUnit = etr.getKey();
             var state = state(stateData);
             if (StringUtils.equals(broker, stateData.dstBroker())) {
                 if (isActiveState(state)) {
-                    overrideOwnership(serviceUnit, stateData);
+                    if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+                        orphanSystemServiceUnits.put(serviceUnit, stateData);
+                    } else {
+                        overrideOwnership(serviceUnit, stateData, broker);
+                    }
                     orphanServiceUnitCleanupCnt++;
                 }
 
             } else if (StringUtils.equals(broker, stateData.sourceBroker())) {
                 if (isInFlightState(state)) {
-                    overrideOwnership(serviceUnit, stateData);
+                    if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+                        orphanSystemServiceUnits.put(serviceUnit, stateData);
+                    } else {
+                        overrideOwnership(serviceUnit, stateData, broker);
+                    }
                     orphanServiceUnitCleanupCnt++;
                 }
             }
@@ -1170,14 +1225,33 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         try {
             producer.flush();
         } catch (PulsarClientException e) {
-            log.error("Failed to flush the in-flight messages.", e);
+            log.error("Failed to flush the in-flight non-system bundle 
override messages.", e);
         }
 
+
         if (orphanServiceUnitCleanupCnt > 0) {
+            // System bundles can contain this channel's system topic and 
other important system topics.
+            // Cleaning such system bundles(closing the system topics) 
together with the non-system bundles
+            // can cause the cluster to be temporarily unstable.
+            // Hence, we clean the non-system bundles first and gracefully 
wait for them.
+            // After that, we clean the system bundles, if any.
+            waitForCleanups(broker, true, 
OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
             this.totalOrphanServiceUnitCleanupCnt += 
orphanServiceUnitCleanupCnt;
             this.totalInactiveBrokerCleanupCnt++;
         }
 
+        // clean system bundles in the end
+        for (var orphanSystemServiceUnit : 
orphanSystemServiceUnits.entrySet()) {
+            log.info("Overriding orphan system service unit:{}", 
orphanSystemServiceUnit.getKey());
+            overrideOwnership(orphanSystemServiceUnit.getKey(), 
orphanSystemServiceUnit.getValue(), broker);
+        }
+
+        try {
+            producer.flush();
+        } catch (PulsarClientException e) {
+            log.error("Failed to flush the in-flight system bundle override 
messages.", e);
+        }
+
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
 
@@ -1196,9 +1270,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     }
 
-    private Optional<String> selectBroker(String serviceUnit) {
+    private Optional<String> selectBroker(String serviceUnit, String 
inactiveBroker) {
         try {
-            return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
+            return loadManager.selectAsync(getNamespaceBundle(serviceUnit), 
Set.of(inactiveBroker))
                     .get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
         } catch (Throwable e) {
             log.error("Failed to select a broker for serviceUnit:{}", 
serviceUnit);
@@ -1206,8 +1280,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         return Optional.empty();
     }
 
-    private Optional<ServiceUnitStateData> getRollForwardStateData(String 
serviceUnit, long nextVersionId) {
-        Optional<String> selectedBroker = selectBroker(serviceUnit);
+    private Optional<ServiceUnitStateData> getRollForwardStateData(String 
serviceUnit,
+                                                                   String 
inactiveBroker,
+                                                                   long 
nextVersionId) {
+        Optional<String> selectedBroker = selectBroker(serviceUnit, 
inactiveBroker);
         if (selectedBroker.isEmpty()) {
             return Optional.empty();
         }
@@ -1222,7 +1298,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         var state = orphanData.state();
         switch (state) {
             case Assigning: {
-                return getRollForwardStateData(serviceUnit, nextVersionId);
+                return getRollForwardStateData(serviceUnit, 
orphanData.dstBroker(), nextVersionId);
             }
             case Splitting: {
                 return Optional.of(new ServiceUnitStateData(Splitting,
@@ -1235,7 +1311,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     // rollback to the src
                     return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true, nextVersionId));
                 } else {
-                    return getRollForwardStateData(serviceUnit, nextVersionId);
+                    return getRollForwardStateData(serviceUnit, 
orphanData.sourceBroker(), nextVersionId);
                 }
             }
             default: {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d72ce9661b9..498f48d16d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -882,6 +882,51 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(actual, expected);
     }
 
+    @Test
+    public void testDisableBroker() throws Exception {
+        // Test rollback to modular load manager.
+        ServiceConfiguration defaultConf = getDefaultConf();
+        defaultConf.setAllowAutoTopicCreation(true);
+        defaultConf.setForceDeleteNamespaceAllowed(true);
+        
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        defaultConf.setLoadBalancerSheddingEnabled(false);
+        try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
+            var pulsar3 = additionalPulsarTestContext.getPulsarService();
+            ExtensibleLoadManagerImpl ternaryLoadManager = 
spy((ExtensibleLoadManagerImpl)
+                    FieldUtils.readField(pulsar3.getLoadManager().get(), 
"loadManager", true));
+            String topic = "persistent://public/default/test";
+
+            String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+            TopicName topicName = TopicName.get("test");
+            NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+            if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
+                
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange(),
+                        pulsar3.getLookupServiceAddress());
+                lookupResult1 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+            }
+            String lookupResult2 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+            String lookupResult3 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+
+            assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+            assertEquals(lookupResult1, lookupResult2);
+            assertEquals(lookupResult1, lookupResult3);
+
+
+            
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            
assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+            ternaryLoadManager.disableBroker();
+
+            
assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            if (primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get()) {
+                
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            } else {
+                
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            }
+        }
+    }
+
     private static abstract class MockBrokerFilter implements BrokerFilter {
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 0f682cf048f..1263170bec4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -517,7 +517,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // recovered, check the monitor update state : Assigned -> Owned
         
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
-                .when(loadManager).selectAsync(any());
+                .when(loadManager).selectAsync(any(), any());
         FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1 , true);
@@ -715,8 +715,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         var cleanupJobs1 = getCleanupJobs(channel1);
         var cleanupJobs2 = getCleanupJobs(channel2);
-        var leaderCleanupJobs = spy(cleanupJobs1);
-        var followerCleanupJobs = spy(cleanupJobs2);
+        var leaderCleanupJobsTmp = spy(cleanupJobs1);
+        var followerCleanupJobsTmp = spy(cleanupJobs2);
         var leaderChannel = channel1;
         var followerChannel = channel2;
         String leader = channel1.getChannelOwnerAsync().get(2, 
TimeUnit.SECONDS).get();
@@ -725,10 +725,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         if (leader.equals(lookupServiceAddress2)) {
             leaderChannel = channel2;
             followerChannel = channel1;
-            var tmp = followerCleanupJobs;
-            followerCleanupJobs = leaderCleanupJobs;
-            leaderCleanupJobs = tmp;
+            var tmp = followerCleanupJobsTmp;
+            followerCleanupJobsTmp = leaderCleanupJobsTmp;
+            leaderCleanupJobsTmp = tmp;
         }
+        final var leaderCleanupJobs = leaderCleanupJobsTmp;
+        final var followerCleanupJobs = followerCleanupJobsTmp;
         FieldUtils.writeDeclaredField(leaderChannel, "cleanupJobs", 
leaderCleanupJobs,
                 true);
         FieldUtils.writeDeclaredField(followerChannel, "cleanupJobs", 
followerCleanupJobs,
@@ -737,7 +739,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
         
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
-                .when(loadManager).selectAsync(any());
+                .when(loadManager).selectAsync(any(), any());
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
@@ -771,9 +773,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
 
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
 
-        assertEquals(0, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
         validateMonitorCounters(leaderChannel,
                 1,
                 0,
@@ -799,8 +803,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
-        assertEquals(1, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(1, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
+
         validateMonitorCounters(leaderChannel,
                 1,
                 0,
@@ -816,8 +824,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
-        assertEquals(0, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
+
         validateMonitorCounters(leaderChannel,
                 1,
                 0,
@@ -835,8 +847,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
-        assertEquals(1, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(1, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
+
         validateMonitorCounters(leaderChannel,
                 1,
                 0,
@@ -854,8 +869,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
-        assertEquals(0, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
+
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
@@ -880,8 +898,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
-        assertEquals(0, leaderCleanupJobs.size());
-        assertEquals(0, followerCleanupJobs.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, leaderCleanupJobs.size());
+            assertEquals(0, followerCleanupJobs.size());
+        });
+
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
@@ -1103,7 +1124,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
-                .when(loadManager).selectAsync(any());
+                .when(loadManager).selectAsync(any(), any());
         channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
         // channel1 is broken. the assign won't be complete.
         waitUntilState(channel1, bundle);
@@ -1442,7 +1463,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // test stable metadata state
         
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
-                .when(loadManager).selectAsync(any());
+                .when(loadManager).selectAsync(any(), any());
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         followerChannel.handleMetadataSessionEvent(SessionReestablished);
         FieldUtils.writeDeclaredField(leaderChannel, 
"lastMetadataSessionEventTimestamp",
@@ -1507,7 +1528,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         // test stable metadata state
         
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
-                .when(loadManager).selectAsync(any());
+                .when(loadManager).selectAsync(any(), any());
         FieldUtils.writeDeclaredField(leaderChannel, 
"inFlightStateWaitingTimeInMillis",
                 -1, true);
         FieldUtils.writeDeclaredField(followerChannel, 
"inFlightStateWaitingTimeInMillis",


Reply via email to