This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b2b83d60e708d4c0e8ee304b2d556c0fc71c5d4
Author: Kai Wang <[email protected]>
AuthorDate: Thu Oct 19 07:08:49 2023 -0500

    [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager 
(#21332)
    
    (cherry picked from commit ecd40e43a6b90b58e209bc9bced84b35d933619e)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |   2 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  17 +-
 .../extensions/manager/UnloadManager.java          |   7 +
 .../pulsar/broker/namespace/NamespaceService.java  |   4 -
 .../pulsar/broker/service/BrokerService.java       |  15 +
 .../extensions/ExtensibleLoadManagerImplTest.java  | 309 ++++++++++++---------
 .../channel/ServiceUnitStateChannelTest.java       |  12 +-
 7 files changed, 218 insertions(+), 148 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 85baf9ec4fb..d3119365ddf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -304,7 +304,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                             }
                         });
                     });
-            this.serviceUnitStateChannel = new 
ServiceUnitStateChannelImpl(pulsar);
+            this.serviceUnitStateChannel = 
ServiceUnitStateChannelImpl.newInstance(pulsar);
             this.brokerRegistry.start();
             this.splitManager = new SplitManager(splitCounter);
             this.unloadManager = new UnloadManager(unloadCounter);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f7e09a2bec5..3cf16709cde 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -200,7 +200,18 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         Unstable
     }
 
+    public static ServiceUnitStateChannelImpl newInstance(PulsarService 
pulsar) {
+        return new ServiceUnitStateChannelImpl(pulsar);
+    }
+
     public ServiceUnitStateChannelImpl(PulsarService pulsar) {
+        this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS, 
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS);
+    }
+
+    @VisibleForTesting
+    public ServiceUnitStateChannelImpl(PulsarService pulsar,
+                                       long inFlightStateWaitingTimeInMillis,
+                                       long ownershipMonitorDelayTimeInSecs) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
         this.lookupServiceAddress = pulsar.getLookupServiceAddress();
@@ -210,8 +221,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         this.stateChangeListeners = new StateChangeListeners();
         this.semiTerminalStateWaitingTimeInMillis = 
config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
                 * 1000;
-        this.inFlightStateWaitingTimeInMillis = 
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
-        this.ownershipMonitorDelayTimeInSecs = 
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
+        this.inFlightStateWaitingTimeInMillis = 
inFlightStateWaitingTimeInMillis;
+        this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs;
         if (semiTerminalStateWaitingTimeInMillis < 
inFlightStateWaitingTimeInMillis) {
             throw new IllegalArgumentException(
                     "Invalid Config: 
loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < "
@@ -837,7 +848,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         } finally {
             var future = requested.getValue();
             if (future != null) {
-                future.orTimeout(inFlightStateWaitingTimeInMillis, 
TimeUnit.MILLISECONDS)
+                future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, 
TimeUnit.MILLISECONDS)
                         .whenComplete((v, e) -> {
                                     if (e != null) {
                                         getOwnerRequests.remove(serviceUnit, 
future);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index 2dde0c4708e..ffdbbc2af42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -88,6 +88,13 @@ public class UnloadManager implements StateChangeListener {
 
     @Override
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
+        if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Handling {} for service unit {} with exception.", 
data, serviceUnit, t);
+            }
+            this.complete(serviceUnit, t);
+            return;
+        }
         ServiceUnitState state = ServiceUnitStateData.state(data);
         switch (state) {
             case Free, Owned -> this.complete(serviceUnit, t);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 57c0cc7c046..c69e30173aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1558,10 +1558,6 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> 
checkOwnershipPresentAsync(NamespaceBundle bundle) {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
-            if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
-                return FutureUtil.failedFuture(new 
UnsupportedOperationException(
-                        "Ownership check for system namespace is not 
supported"));
-            }
             ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
             return extensibleLoadManager.getOwnershipAsync(Optional.empty(), 
bundle)
                     .thenApply(Optional::isPresent);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6a61e851fbc..3c0ea0f8fc4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2193,6 +2193,21 @@ public class BrokerService implements Closeable {
             if (serviceUnit.includes(topicName)) {
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
+                if (topicFuture.isCompletedExceptionally()) {
+                    try {
+                        topicFuture.get();
+                    } catch (InterruptedException | ExecutionException ex) {
+                        if (ex.getCause() instanceof 
ServiceUnitNotReadyException) {
+                            // Topic was already unloaded
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Topic was already unloaded", 
topicName);
+                            }
+                            return;
+                        } else {
+                            log.warn("[{}] Got exception when closing topic", 
topicName, ex);
+                        }
+                    }
+                }
                 closeFutures.add(topicFuture
                         .thenCompose(t -> t.isPresent() ? 
t.get().close(closeWithoutWaitingClientDisconnect)
                                 : CompletableFuture.completedFuture(null)));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 011e7174cbe..20ba9500cb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -38,9 +38,11 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static 
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
 import static 
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
 import static 
org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -113,6 +115,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -142,46 +145,56 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     public void setup() throws Exception {
-        conf.setForceDeleteNamespaceAllowed(true);
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
-        
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-        
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        conf.setLoadBalancerSheddingEnabled(false);
-        conf.setLoadBalancerDebugModeEnabled(true);
-        conf.setTopicLevelPoliciesEnabled(false);
-        super.internalSetup(conf);
-        pulsar1 = pulsar;
-        ServiceConfiguration defaultConf = getDefaultConf();
-        defaultConf.setAllowAutoTopicCreation(true);
-        defaultConf.setForceDeleteNamespaceAllowed(true);
-        
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-        
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        defaultConf.setLoadBalancerSheddingEnabled(false);
-        defaultConf.setTopicLevelPoliciesEnabled(false);
-        additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
-        pulsar2 = additionalPulsarTestContext.getPulsarService();
-
-        setPrimaryLoadManager();
-
-        setSecondaryLoadManager();
-
-        admin.clusters().createCluster(this.conf.getClusterName(),
-                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
-                        Sets.newHashSet(this.conf.getClusterName())));
-        admin.namespaces().createNamespace("public/default");
-        admin.namespaces().setNamespaceReplicationClusters("public/default",
-                Sets.newHashSet(this.conf.getClusterName()));
-
-        admin.namespaces().createNamespace(defaultTestNamespace);
-        
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
-                Sets.newHashSet(this.conf.getClusterName()));
+        try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+                     mockStatic(ServiceUnitStateChannelImpl.class)) {
+            channelMockedStatic.when(() -> 
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+                    .thenAnswer(invocation -> {
+                        PulsarService pulsarService = 
invocation.getArgument(0);
+                        // Set the inflight state waiting time and ownership 
monitor delay time to 5 seconds to avoid
+                        // stuck when doing unload.
+                        return new ServiceUnitStateChannelImpl(pulsarService, 
5 * 1000, 1);
+                    });
+            conf.setForceDeleteNamespaceAllowed(true);
+            conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+            conf.setAllowAutoTopicCreation(true);
+            
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+            
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            conf.setLoadBalancerSheddingEnabled(false);
+            conf.setLoadBalancerDebugModeEnabled(true);
+            conf.setTopicLevelPoliciesEnabled(true);
+            super.internalSetup(conf);
+            pulsar1 = pulsar;
+            ServiceConfiguration defaultConf = getDefaultConf();
+            defaultConf.setAllowAutoTopicCreation(true);
+            defaultConf.setForceDeleteNamespaceAllowed(true);
+            
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+            
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            defaultConf.setLoadBalancerSheddingEnabled(false);
+            defaultConf.setTopicLevelPoliciesEnabled(true);
+            additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
+            pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+            setPrimaryLoadManager();
+
+            setSecondaryLoadManager();
+
+            admin.clusters().createCluster(this.conf.getClusterName(),
+                    
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+            admin.tenants().createTenant("public",
+                    new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+                            Sets.newHashSet(this.conf.getClusterName())));
+            admin.namespaces().createNamespace("public/default");
+            
admin.namespaces().setNamespaceReplicationClusters("public/default",
+                    Sets.newHashSet(this.conf.getClusterName()));
+
+            admin.namespaces().createNamespace(defaultTestNamespace);
+            
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+                    Sets.newHashSet(this.conf.getClusterName()));
+        }
     }
 
     @Override
-    @AfterClass
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         pulsar1 = null;
         pulsar2.close();
@@ -557,119 +570,134 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeployAndRollbackLoadManager() throws Exception {
-        // Test rollback to modular load manager.
-        ServiceConfiguration defaultConf = getDefaultConf();
-        defaultConf.setAllowAutoTopicCreation(true);
-        defaultConf.setForceDeleteNamespaceAllowed(true);
-        
defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
-        defaultConf.setLoadBalancerSheddingEnabled(false);
-        try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
-            // start pulsar3 with old load manager
-            var pulsar3 = additionalPulsarTestContext.getPulsarService();
-            String topic = "persistent://" + defaultTestNamespace + "/test";
-
-            String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
-            assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
-
-            String lookupResult2 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-            String lookupResult3 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
-            assertEquals(lookupResult1, lookupResult2);
-            assertEquals(lookupResult1, lookupResult3);
-
-            NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
-            LookupOptions options = LookupOptions.builder()
-                    .authoritative(false)
-                    .requestHttps(false)
-                    .readOnly(false)
-                    .loadTopicsInBundle(false).build();
-            Optional<URL> webServiceUrl1 =
-                    pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl1.isPresent());
-            assertEquals(webServiceUrl1.get().toString(), 
pulsar3.getWebServiceAddress());
-
-            Optional<URL> webServiceUrl2 =
-                    pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl2.isPresent());
-            assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
-
-            Optional<URL> webServiceUrl3 =
-                    pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl3.isPresent());
-            assertEquals(webServiceUrl3.get().toString(), 
webServiceUrl1.get().toString());
-
-            List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, 
pulsar3);
-            for (PulsarService pulsarService : pulsarServices) {
-                // Test lookup heartbeat namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupHeartbeatOwner(pulsarService, 
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
-                }
-                // Test lookup SLA namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupSLANamespaceOwner(pulsarService, 
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
-                }
-            }
-
-            // Test deploy new broker with new load manager
-            ServiceConfiguration conf = getDefaultConf();
-            conf.setAllowAutoTopicCreation(true);
-            conf.setForceDeleteNamespaceAllowed(true);
-            
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-            
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-            try (var additionPulsarTestContext = 
createAdditionalPulsarTestContext(conf)) {
-                var pulsar4 = additionPulsarTestContext.getPulsarService();
-
-                Set<String> availableCandidates = 
Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
-                        pulsar2.getBrokerServiceUrl(),
-                        pulsar4.getBrokerServiceUrl());
-                String lookupResult4 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic);
-                assertTrue(availableCandidates.contains(lookupResult4));
-
-                String lookupResult5 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult6 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult7 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
-                assertEquals(lookupResult4, lookupResult5);
-                assertEquals(lookupResult4, lookupResult6);
-                assertEquals(lookupResult4, lookupResult7);
-
-                Set<String> availableWebUrlCandidates = 
Sets.newHashSet(pulsar1.getWebServiceAddress(),
-                        pulsar2.getWebServiceAddress(),
-                        pulsar4.getWebServiceAddress());
-
-                webServiceUrl1 =
+        try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+                     mockStatic(ServiceUnitStateChannelImpl.class)) {
+            channelMockedStatic.when(() -> 
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+                    .thenAnswer(invocation -> {
+                        PulsarService pulsarService = 
invocation.getArgument(0);
+                        // Set the inflight state waiting time and ownership 
monitor delay time to 5 seconds to avoid
+                        // stuck when doing unload.
+                        return new ServiceUnitStateChannelImpl(pulsarService, 
5 * 1000, 1);
+                    });
+            // Test rollback to modular load manager.
+            ServiceConfiguration defaultConf = getDefaultConf();
+            defaultConf.setAllowAutoTopicCreation(true);
+            defaultConf.setForceDeleteNamespaceAllowed(true);
+            
defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+            defaultConf.setLoadBalancerSheddingEnabled(false);
+            try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
+                // start pulsar3 with old load manager
+                var pulsar3 = additionalPulsarTestContext.getPulsarService();
+                String topic = "persistent://" + defaultTestNamespace + 
"/test";
+
+                String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+                assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+
+                String lookupResult2 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+                String lookupResult3 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+                assertEquals(lookupResult1, lookupResult2);
+                assertEquals(lookupResult1, lookupResult3);
+
+                NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
+                LookupOptions options = LookupOptions.builder()
+                        .authoritative(false)
+                        .requestHttps(false)
+                        .readOnly(false)
+                        .loadTopicsInBundle(false).build();
+                Optional<URL> webServiceUrl1 =
                         pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
                 assertTrue(webServiceUrl1.isPresent());
-                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+                assertEquals(webServiceUrl1.get().toString(), 
pulsar3.getWebServiceAddress());
 
-                webServiceUrl2 =
+                Optional<URL> webServiceUrl2 =
                         pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
                 assertTrue(webServiceUrl2.isPresent());
                 assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
 
-                // The pulsar3 will redirect to pulsar4
-                webServiceUrl3 =
+                Optional<URL> webServiceUrl3 =
                         pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
                 assertTrue(webServiceUrl3.isPresent());
-                // It will redirect to pulsar4
-                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
-                var webServiceUrl4 =
-                        pulsar4.getNamespaceService().getWebServiceUrl(bundle, 
options);
-                assertTrue(webServiceUrl4.isPresent());
-                assertEquals(webServiceUrl4.get().toString(), 
webServiceUrl1.get().toString());
+                assertEquals(webServiceUrl3.get().toString(), 
webServiceUrl1.get().toString());
 
-                pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+                List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, 
pulsar3);
                 for (PulsarService pulsarService : pulsarServices) {
                     // Test lookup heartbeat namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
-                        assertLookupHeartbeatOwner(pulsarService, 
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        assertLookupHeartbeatOwner(pulsarService,
+                                pulsar.getLookupServiceAddress(), 
pulsar.getBrokerServiceUrl());
                     }
                     // Test lookup SLA namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
-                        assertLookupSLANamespaceOwner(pulsarService, 
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        assertLookupSLANamespaceOwner(pulsarService,
+                                pulsar.getLookupServiceAddress(), 
pulsar.getBrokerServiceUrl());
+                    }
+                }
+
+                // Test deploy new broker with new load manager
+                ServiceConfiguration conf = getDefaultConf();
+                conf.setAllowAutoTopicCreation(true);
+                conf.setForceDeleteNamespaceAllowed(true);
+                
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+                
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+                try (var additionPulsarTestContext = 
createAdditionalPulsarTestContext(conf)) {
+                    var pulsar4 = additionPulsarTestContext.getPulsarService();
+
+                    Set<String> availableCandidates = 
Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
+                            pulsar2.getBrokerServiceUrl(),
+                            pulsar4.getBrokerServiceUrl());
+                    String lookupResult4 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic);
+                    assertTrue(availableCandidates.contains(lookupResult4));
+
+                    String lookupResult5 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+                    String lookupResult6 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+                    String lookupResult7 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+                    assertEquals(lookupResult4, lookupResult5);
+                    assertEquals(lookupResult4, lookupResult6);
+                    assertEquals(lookupResult4, lookupResult7);
+
+                    Set<String> availableWebUrlCandidates = 
Sets.newHashSet(pulsar1.getWebServiceAddress(),
+                            pulsar2.getWebServiceAddress(),
+                            pulsar4.getWebServiceAddress());
+
+                    webServiceUrl1 =
+                            
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl1.isPresent());
+                    
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+
+                    webServiceUrl2 =
+                            
pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl2.isPresent());
+                    assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
+
+                    // The pulsar3 will redirect to pulsar4
+                    webServiceUrl3 =
+                            
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl3.isPresent());
+                    // It will redirect to pulsar4
+                    
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
+
+                    var webServiceUrl4 =
+                            
pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl4.isPresent());
+                    assertEquals(webServiceUrl4.get().toString(), 
webServiceUrl1.get().toString());
+
+                    pulsarServices = List.of(pulsar1, pulsar2, pulsar3, 
pulsar4);
+                    for (PulsarService pulsarService : pulsarServices) {
+                        // Test lookup heartbeat namespace's topic
+                        for (PulsarService pulsar : pulsarServices) {
+                            assertLookupHeartbeatOwner(pulsarService,
+                                    pulsar.getLookupServiceAddress(), 
pulsar.getBrokerServiceUrl());
+                        }
+                        // Test lookup SLA namespace's topic
+                        for (PulsarService pulsar : pulsarServices) {
+                            assertLookupSLANamespaceOwner(pulsarService,
+                                    pulsar.getLookupServiceAddress(), 
pulsar.getBrokerServiceUrl());
+                        }
                     }
                 }
             }
         }
+
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1108,6 +1136,12 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         NamespaceName heartbeatNamespacePulsar2V2 =
                 
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), 
pulsar2.getConfiguration());
 
+        NamespaceName slaMonitorNamespacePulsar1 =
+                getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), 
pulsar1.getConfiguration());
+
+        NamespaceName slaMonitorNamespacePulsar2 =
+                getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), 
pulsar2.getConfiguration());
+
         NamespaceBundle bundle1 = 
pulsar1.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar1V1);
         NamespaceBundle bundle2 = 
pulsar1.getNamespaceService().getNamespaceBundleFactory()
@@ -1118,27 +1152,34 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         NamespaceBundle bundle4 = 
pulsar2.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar2V2);
 
+        NamespaceBundle slaBundle1 = 
pulsar1.getNamespaceService().getNamespaceBundleFactory()
+                .getFullBundle(slaMonitorNamespacePulsar1);
+        NamespaceBundle slaBundle2 = 
pulsar2.getNamespaceService().getNamespaceBundleFactory()
+                .getFullBundle(slaMonitorNamespacePulsar2);
+
+
         Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = 
primaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
         // heartbeat namespace bundle will own by pulsar1
-        assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
+        assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
         Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = 
secondaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
-        assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
+        assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), 
pulsar1.getLookupServiceAddress());
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), 
pulsar2.getLookupServiceAddress());
-        assertEquals(ownedNamespacesByPulsar1.size(), 3);
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
-        assertEquals(ownedNamespacesByPulsar2.size(), 3);
+        
assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
+
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
+        
assertTrue(ownedNamespacesByPulsar2.containsKey(slaBundle2.toString()));
 
         String topic = "persistent://" + defaultTestNamespace + 
"/test-get-owned-service-units";
         admin.topics().createPartitionedTopic(topic, 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index f9893ea3f63..1da7059bf49 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -507,10 +507,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(1, getOwnerRequests1.size());
         assertEquals(1, getOwnerRequests2.size());
 
-        // In 5 secs, the getOwnerAsync requests(lookup requests) should time 
out.
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        // In 10 secs, the getOwnerAsync requests(lookup requests) should time 
out.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner1.isCompletedExceptionally()));
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner2.isCompletedExceptionally()));
 
         assertEquals(0, getOwnerRequests1.size());
@@ -1139,10 +1139,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(owner1.isDone());
         assertFalse(owner2.isDone());
 
-        // In 5 secs, the getOwnerAsync requests(lookup requests) should time 
out.
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        // In 10 secs, the getOwnerAsync requests(lookup requests) should time 
out.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner1.isCompletedExceptionally()));
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner2.isCompletedExceptionally()));
 
         // recovered, check the monitor update state : Assigned -> Owned

Reply via email to