This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 58a45aa2d2a [fix][broker][branch-3.0] Return getOwnerAsync without 
waiting on source broker upon Assigning and Releasing and handle role change 
during role init (#22112) (#22156)
58a45aa2d2a is described below

commit 58a45aa2d2abf1014675cd367132c22000907a3f
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Feb 28 18:18:22 2024 -0800

    [fix][broker][branch-3.0] Return getOwnerAsync without waiting on source 
broker upon Assigning and Releasing and handle role change during role init 
(#22112) (#22156)
    
    (cherry picked from commit b3b1bfb3e2a29674cc9d6144baeef1a3f0058c07)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  24 ++++
 .../channel/ServiceUnitStateChannelImpl.java       |  11 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 128 ++++++++++++++++-----
 .../channel/ServiceUnitStateChannelTest.java       |  77 +++++++++++--
 .../loadbalance/ExtensibleLoadManagerTest.java     |  35 +++---
 5 files changed, 224 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 409bb55075b..6a0e677c662 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -777,8 +777,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         log.info("This broker:{} is setting the role from {} to {}",
                 pulsar.getBrokerId(), role, Leader);
         int retry = 0;
+        boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
+                if (!serviceUnitStateChannel.isChannelOwner()) {
+                    becameFollower = true;
+                    break;
+                }
                 initWaiter.await();
                 // Confirm the system topics have been created or create them 
if they do not exist.
                 // If the leader has changed, the new leader need to reset
@@ -802,6 +807,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 }
             }
         }
+
+        if (becameFollower) {
+            log.warn("The broker:{} became follower while initializing leader 
role.", pulsar.getBrokerId());
+            playFollower();
+            return;
+        }
+
         role = Leader;
         log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());
 
@@ -815,8 +827,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         log.info("This broker:{} is setting the role from {} to {}",
                 pulsar.getBrokerId(), role, Follower);
         int retry = 0;
+        boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
+                if (serviceUnitStateChannel.isChannelOwner()) {
+                    becameLeader = true;
+                    break;
+                }
                 initWaiter.await();
                 unloadScheduler.close();
                 serviceUnitStateChannel.cancelOwnershipMonitor();
@@ -836,6 +853,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 }
             }
         }
+
+        if (becameLeader) {
+            log.warn("This broker:{} became leader while initializing follower 
role.", pulsar.getBrokerId());
+            playLeader();
+            return;
+        }
+
         role = Follower;
         log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index a94ce7446bd..1471d4a75c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -533,7 +533,16 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 return getActiveOwnerAsync(serviceUnit, state, 
Optional.of(data.sourceBroker()));
             }
             case Assigning, Releasing -> {
-                return getActiveOwnerAsync(serviceUnit, state, 
Optional.empty());
+                if (isTargetBroker(data.dstBroker())) {
+                    return getActiveOwnerAsync(serviceUnit, state, 
Optional.of(data.dstBroker()));
+                }
+                // If this broker is not the dst broker, return the dst broker 
as the owner(or empty).
+                // Clients need to connect(redirect) to the dst broker anyway
+                // and wait for the dst broker to receive `Owned`.
+                // This is also required to return getOwnerAsync on the src 
broker immediately during unloading.
+                // Otherwise, topic creation(getOwnerAsync) could block 
unloading bundles,
+                // if the topic creation(getOwnerAsync) happens during 
unloading on the src broker.
+                return 
CompletableFuture.completedFuture(Optional.ofNullable(data.dstBroker()));
             }
             case Init, Free -> {
                 return CompletableFuture.completedFuture(Optional.empty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 7fe5b5a52ff..850bf9a96c8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +81,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
@@ -96,12 +100,14 @@ import 
org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerAssignment;
@@ -794,7 +800,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             reset();
             return null;
         }).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
-        FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true);
 
         var topBundlesLoadDataStoreSecondary = 
(LoadDataStore<TopBundlesLoadData>)
                 FieldUtils.readDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", true);
@@ -817,36 +822,65 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             reset();
             return null;
         }).when(topBundlesLoadDataStoreSecondarySpy).closeTableView();
-        FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
 
-        if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
-            primaryLoadManager.playFollower(); // close 3 times
-            primaryLoadManager.playFollower(); // close 1 time
-            secondaryLoadManager.playLeader();
-            secondaryLoadManager.playLeader();
-            primaryLoadManager.playLeader(); // close 3 times and open 3 times
-            primaryLoadManager.playLeader(); // close 1 time and open 1 time,
-            secondaryLoadManager.playFollower();
-            secondaryLoadManager.playFollower();
-        } else {
-            primaryLoadManager.playLeader();
-            primaryLoadManager.playLeader();
-            secondaryLoadManager.playFollower();
-            secondaryLoadManager.playFollower();
-            primaryLoadManager.playFollower();
+        try {
+            FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore",
+                    topBundlesLoadDataStorePrimarySpy, true);
+            FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore",
+                    topBundlesLoadDataStoreSecondarySpy, true);
+
+
+            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+                primaryLoadManager.playLeader();
+                secondaryLoadManager.playFollower();
+                verify(topBundlesLoadDataStorePrimarySpy, 
times(3)).startTableView();
+                verify(topBundlesLoadDataStorePrimarySpy, 
times(5)).closeTableView();
+                verify(topBundlesLoadDataStoreSecondarySpy, 
times(0)).startTableView();
+                verify(topBundlesLoadDataStoreSecondarySpy, 
times(3)).closeTableView();
+            } else {
+                primaryLoadManager.playFollower();
+                secondaryLoadManager.playLeader();
+                verify(topBundlesLoadDataStoreSecondarySpy, 
times(3)).startTableView();
+                verify(topBundlesLoadDataStoreSecondarySpy, 
times(5)).closeTableView();
+                verify(topBundlesLoadDataStorePrimarySpy, 
times(0)).startTableView();
+                verify(topBundlesLoadDataStorePrimarySpy, 
times(3)).closeTableView();
+            }
+
             primaryLoadManager.playFollower();
-            secondaryLoadManager.playLeader();
-            secondaryLoadManager.playLeader();
-        }
+            secondaryLoadManager.playFollower();
 
+            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
+                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
+            } else {
+                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
+                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
+            }
 
-        verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
-        verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
-        verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
-        verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();
+            primaryLoadManager.playLeader();
+            secondaryLoadManager.playLeader();
 
-        FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
-        FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
+            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
+                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
+            } else {
+                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
+                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
+            }
+        } finally {
+            FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore",
+                    topBundlesLoadDataStorePrimary, true);
+            FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore",
+                    topBundlesLoadDataStoreSecondary, true);
+        }
     }
 
     @Test
@@ -1252,6 +1286,32 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 });
     }
 
+    @Test(timeOut = 10 * 1000)
+    public void unloadTimeoutCheckTest()
+            throws Exception {
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("unload-timeout");
+        String topic = topicAndBundle.getLeft().toString();
+        var bundle = topicAndBundle.getRight().toString();
+        var releasing = new ServiceUnitStateData(Releasing, 
pulsar2.getBrokerId(), pulsar1.getBrokerId(), 1);
+        overrideTableView(channel1, bundle, releasing);
+        var topicFuture = pulsar1.getBrokerService().getOrCreateTopic(topic);
+
+
+        try {
+            topicFuture.get(1, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.info("getOrCreateTopic failed", e);
+            if (!(e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException && e.getMessage()
+                    .contains("Please redo the lookup"))) {
+                fail();
+            }
+        }
+
+        pulsar1.getBrokerService()
+                .unloadServiceUnit(topicAndBundle.getRight(), true, 5,
+                        TimeUnit.SECONDS).get(2, TimeUnit.SECONDS);
+    }
+
     private static abstract class MockBrokerFilter implements BrokerFilter {
 
         @Override
@@ -1284,4 +1344,20 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
         return pulsar.getNamespaceService().getBundleAsync(topic);
     }
+
+    private Pair<TopicName, NamespaceBundle> 
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+            throws Exception {
+        TopicName changeEventsTopicName =
+                TopicName.get(defaultTestNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, 
changeEventsTopicName).get();
+        int i = 0;
+        while (true) {
+            TopicName topicName = TopicName.get(defaultTestNamespace + "/" + 
topicNamePrefix + "-" + i);
+            NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+            if (!bundle.equals(changeEventsBundle)) {
+                return Pair.of(topicName, bundle);
+            }
+            i++;
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 51afbb23096..ceb58e8d964 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -484,19 +484,17 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
 
-        assertFalse(owner1.isDone());
+        assertTrue(owner1.isDone());
+        assertEquals(brokerId2, owner1.get().get());
         assertFalse(owner2.isDone());
 
-        assertEquals(1, getOwnerRequests1.size());
+        assertEquals(0, getOwnerRequests1.size());
         assertEquals(1, getOwnerRequests2.size());
 
         // In 10 secs, the getOwnerAsync requests(lookup requests) should time 
out.
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertTrue(owner1.isCompletedExceptionally()));
         Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner2.isCompletedExceptionally()));
 
-        assertEquals(0, getOwnerRequests1.size());
         assertEquals(0, getOwnerRequests2.size());
 
         // recovered, check the monitor update state : Assigned -> Owned
@@ -1133,12 +1131,10 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
 
-        assertFalse(owner1.isDone());
+        assertTrue(owner1.isDone());
         assertFalse(owner2.isDone());
 
         // In 10 secs, the getOwnerAsync requests(lookup requests) should time 
out.
-        Awaitility.await().atMost(10, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertTrue(owner1.isCompletedExceptionally()));
         Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(owner2.isCompletedExceptionally()));
 
@@ -1317,6 +1313,68 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(channel1.isOwner(bundle));
     }
 
+    @Test(priority = 15)
+    public void testGetOwnerAsync() throws Exception {
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, 
brokerId1, 1));
+        var owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId1, owner.get().get());
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, 
brokerId2, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId2, owner.get().get());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Assigning, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(!owner.isDone());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Assigning, brokerId2, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId2, owner.get().get());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Releasing, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(!owner.isDone());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Releasing, brokerId2, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId2, owner.get().get());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Releasing, null, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(Optional.empty(), owner.get());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Splitting, null, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId1, owner.get().get());
+
+        overrideTableView(channel1, bundle, new 
ServiceUnitStateData(Splitting, null, brokerId2, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(brokerId2, owner.get().get());
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, 
null, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(Optional.empty(), owner.get());
+
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, 
null, brokerId1, 1));
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertTrue(owner.isCompletedExceptionally());
+
+        overrideTableView(channel1, bundle, null);
+        owner = channel1.getOwnerAsync(bundle);
+        assertTrue(owner.isDone());
+        assertEquals(Optional.empty(), owner.get());
+    }
+
     @Test(priority = 16)
     public void splitAndRetryFailureTest() throws Exception {
         channel1.publishAssignEventAsync(bundle3, brokerId1);
@@ -1775,7 +1833,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         overrideTableView(channel2, serviceUnit, val);
     }
 
-    private static void overrideTableView(ServiceUnitStateChannel channel, 
String serviceUnit, ServiceUnitStateData val)
+    @Test(enabled = false)
+    public static void overrideTableView(ServiceUnitStateChannel channel, 
String serviceUnit, ServiceUnitStateData val)
             throws IllegalAccessException {
         var tv = (TableViewImpl<ServiceUnitStateData>)
                 FieldUtils.readField(channel, "tableview", true);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index b9707ea76c3..af14ef97f85 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -324,8 +324,8 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
         assertEquals(result.size(), NUM_BROKERS);
     }
 
-    @Test(timeOut = 40 * 1000)
-    public void testIsolationPolicy() throws PulsarAdminException {
+    @Test(timeOut = 300 * 1000)
+    public void testIsolationPolicy() throws Exception {
         final String namespaceIsolationPolicyName = "my-isolation-policy";
         final String isolationEnabledNameSpace = DEFAULT_TENANT + 
"/my-isolation-policy" + nsSuffix;
         Map<String, String> parameters1 = new HashMap<>();
@@ -334,7 +334,8 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
 
         Awaitility.await().atMost(10, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
                 () -> {
-                    List<String> activeBrokers = 
admin.brokers().getActiveBrokers();
+                    List<String> activeBrokers = 
admin.brokers().getActiveBrokersAsync()
+                            .get(5, TimeUnit.SECONDS);
                     assertEquals(activeBrokers.size(), NUM_BROKERS);
                 }
         );
@@ -377,15 +378,16 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
             }
         }
 
-        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+        Awaitility.await().atMost(60, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
                 () -> {
-                    List<String> activeBrokers = 
admin.brokers().getActiveBrokers();
+                    List<String> activeBrokers = 
admin.brokers().getActiveBrokersAsync()
+                            .get(5, TimeUnit.SECONDS);
                     assertEquals(activeBrokers.size(), 2);
                 }
         );
 
         Awaitility.await().atMost(60, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
-            String ownerBroker = admin.lookups().lookupTopic(topic);
+            String ownerBroker = 
admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS);
             assertEquals(extractBrokerIndex(ownerBroker), 1);
         });
 
@@ -396,20 +398,23 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
             }
         }
 
-        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+        Awaitility.await().atMost(60, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
             () -> {
-                List<String> activeBrokers = 
admin.brokers().getActiveBrokers();
+                List<String> activeBrokers = 
admin.brokers().getActiveBrokersAsync().get(5, TimeUnit.SECONDS);
                 assertEquals(activeBrokers.size(), 1);
             }
         );
 
-        try {
-            admin.lookups().lookupTopic(topic);
-            fail();
-        } catch (Exception ex) {
-            log.error("Failed to lookup topic: ", ex);
-            assertThat(ex.getMessage()).contains("Failed to select the new 
owner broker for bundle");
-        }
+        Awaitility.await().atMost(60, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+                () -> {
+                    try {
+                        admin.lookups().lookupTopicAsync(topic).get(5, 
TimeUnit.SECONDS);
+                    } catch (Exception ex) {
+                        log.error("Failed to lookup topic: ", ex);
+                        assertThat(ex.getMessage()).contains("Failed to select 
the new owner broker for bundle");
+                    }
+                }
+        );
     }
 
     private String getBrokerUrl(int index) {

Reply via email to