This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 58a45aa2d2a [fix][broker][branch-3.0] Return getOwnerAsync without
waiting on source broker upon Assigning and Releasing and handle role change
during role init (#22112) (#22156)
58a45aa2d2a is described below
commit 58a45aa2d2abf1014675cd367132c22000907a3f
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Feb 28 18:18:22 2024 -0800
[fix][broker][branch-3.0] Return getOwnerAsync without waiting on source
broker upon Assigning and Releasing and handle role change during role init
(#22112) (#22156)
(cherry picked from commit b3b1bfb3e2a29674cc9d6144baeef1a3f0058c07)
---
.../extensions/ExtensibleLoadManagerImpl.java | 24 ++++
.../channel/ServiceUnitStateChannelImpl.java | 11 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 128 ++++++++++++++++-----
.../channel/ServiceUnitStateChannelTest.java | 77 +++++++++++--
.../loadbalance/ExtensibleLoadManagerTest.java | 35 +++---
5 files changed, 224 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 409bb55075b..6a0e677c662 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -777,8 +777,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
+ boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
+ if (!serviceUnitStateChannel.isChannelOwner()) {
+ becameFollower = true;
+ break;
+ }
initWaiter.await();
// Confirm the system topics have been created or create them
if they do not exist.
// If the leader has changed, the new leader need to reset
@@ -802,6 +807,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
}
}
+
+ if (becameFollower) {
+ log.warn("The broker:{} became follower while initializing leader
role.", pulsar.getBrokerId());
+ playFollower();
+ return;
+ }
+
role = Leader;
log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());
@@ -815,8 +827,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
+ boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
+ if (serviceUnitStateChannel.isChannelOwner()) {
+ becameLeader = true;
+ break;
+ }
initWaiter.await();
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
@@ -836,6 +853,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
}
}
+
+ if (becameLeader) {
+ log.warn("This broker:{} became leader while initializing follower
role.", pulsar.getBrokerId());
+ playLeader();
+ return;
+ }
+
role = Follower;
log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index a94ce7446bd..1471d4a75c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -533,7 +533,16 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
- return getActiveOwnerAsync(serviceUnit, state,
Optional.empty());
+ if (isTargetBroker(data.dstBroker())) {
+ return getActiveOwnerAsync(serviceUnit, state,
Optional.of(data.dstBroker()));
+ }
+ // If this broker is not the dst broker, return the dst broker
as the owner(or empty).
+ // Clients need to connect(redirect) to the dst broker anyway
+ // and wait for the dst broker to receive `Owned`.
+ // This is also required to return getOwnerAsync on the src
broker immediately during unloading.
+ // Otherwise, topic creation(getOwnerAsync) could block
unloading bundles,
+ // if the topic creation(getOwnerAsync) happens during
unloading on the src broker.
+ return
CompletableFuture.completedFuture(Optional.ofNullable(data.dstBroker()));
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 7fe5b5a52ff..850bf9a96c8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +81,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
@@ -96,12 +100,14 @@ import
org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
@@ -794,7 +800,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
reset();
return null;
}).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
- FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true);
var topBundlesLoadDataStoreSecondary =
(LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", true);
@@ -817,36 +822,65 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
reset();
return null;
}).when(topBundlesLoadDataStoreSecondarySpy).closeTableView();
- FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
- if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
- primaryLoadManager.playFollower(); // close 3 times
- primaryLoadManager.playFollower(); // close 1 time
- secondaryLoadManager.playLeader();
- secondaryLoadManager.playLeader();
- primaryLoadManager.playLeader(); // close 3 times and open 3 times
- primaryLoadManager.playLeader(); // close 1 time and open 1 time,
- secondaryLoadManager.playFollower();
- secondaryLoadManager.playFollower();
- } else {
- primaryLoadManager.playLeader();
- primaryLoadManager.playLeader();
- secondaryLoadManager.playFollower();
- secondaryLoadManager.playFollower();
- primaryLoadManager.playFollower();
+ try {
+ FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore",
+ topBundlesLoadDataStorePrimarySpy, true);
+ FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore",
+ topBundlesLoadDataStoreSecondarySpy, true);
+
+
+ if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playFollower();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(3)).startTableView();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(5)).closeTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(0)).startTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(3)).closeTableView();
+ } else {
+ primaryLoadManager.playFollower();
+ secondaryLoadManager.playLeader();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(3)).startTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(5)).closeTableView();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(0)).startTableView();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(3)).closeTableView();
+ }
+
primaryLoadManager.playFollower();
- secondaryLoadManager.playLeader();
- secondaryLoadManager.playLeader();
- }
+ secondaryLoadManager.playFollower();
+ if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
+ } else {
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
+ }
- verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
- verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
- verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
- verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playLeader();
- FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
- FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
+ if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
+ } else {
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
+ }
+ } finally {
+ FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore",
+ topBundlesLoadDataStorePrimary, true);
+ FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore",
+ topBundlesLoadDataStoreSecondary, true);
+ }
}
@Test
@@ -1252,6 +1286,32 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
});
}
+ @Test(timeOut = 10 * 1000)
+ public void unloadTimeoutCheckTest()
+ throws Exception {
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("unload-timeout");
+ String topic = topicAndBundle.getLeft().toString();
+ var bundle = topicAndBundle.getRight().toString();
+ var releasing = new ServiceUnitStateData(Releasing,
pulsar2.getBrokerId(), pulsar1.getBrokerId(), 1);
+ overrideTableView(channel1, bundle, releasing);
+ var topicFuture = pulsar1.getBrokerService().getOrCreateTopic(topic);
+
+
+ try {
+ topicFuture.get(1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.info("getOrCreateTopic failed", e);
+ if (!(e.getCause() instanceof
BrokerServiceException.ServiceUnitNotReadyException && e.getMessage()
+ .contains("Please redo the lookup"))) {
+ fail();
+ }
+ }
+
+ pulsar1.getBrokerService()
+ .unloadServiceUnit(topicAndBundle.getRight(), true, 5,
+ TimeUnit.SECONDS).get(2, TimeUnit.SECONDS);
+ }
+
private static abstract class MockBrokerFilter implements BrokerFilter {
@Override
@@ -1284,4 +1344,20 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}
+
+ private Pair<TopicName, NamespaceBundle>
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+ throws Exception {
+ TopicName changeEventsTopicName =
+ TopicName.get(defaultTestNamespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1,
changeEventsTopicName).get();
+ int i = 0;
+ while (true) {
+ TopicName topicName = TopicName.get(defaultTestNamespace + "/" +
topicNamePrefix + "-" + i);
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!bundle.equals(changeEventsBundle)) {
+ return Pair.of(topicName, bundle);
+ }
+ i++;
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 51afbb23096..ceb58e8d964 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -484,19 +484,17 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertFalse(owner1.isDone());
+ assertTrue(owner1.isDone());
+ assertEquals(brokerId2, owner1.get().get());
assertFalse(owner2.isDone());
- assertEquals(1, getOwnerRequests1.size());
+ assertEquals(0, getOwnerRequests1.size());
assertEquals(1, getOwnerRequests2.size());
// In 10 secs, the getOwnerAsync requests(lookup requests) should time
out.
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertTrue(owner1.isCompletedExceptionally()));
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner2.isCompletedExceptionally()));
- assertEquals(0, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests2.size());
// recovered, check the monitor update state : Assigned -> Owned
@@ -1133,12 +1131,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertFalse(owner1.isDone());
+ assertTrue(owner1.isDone());
assertFalse(owner2.isDone());
// In 10 secs, the getOwnerAsync requests(lookup requests) should time
out.
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertTrue(owner1.isCompletedExceptionally()));
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner2.isCompletedExceptionally()));
@@ -1317,6 +1313,68 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertFalse(channel1.isOwner(bundle));
}
+ @Test(priority = 15)
+ public void testGetOwnerAsync() throws Exception {
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned,
brokerId1, 1));
+ var owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId1, owner.get().get());
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned,
brokerId2, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId2, owner.get().get());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Assigning, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(!owner.isDone());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Assigning, brokerId2, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId2, owner.get().get());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Releasing, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(!owner.isDone());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Releasing, brokerId2, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId2, owner.get().get());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Releasing, null, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(Optional.empty(), owner.get());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Splitting, null, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId1, owner.get().get());
+
+ overrideTableView(channel1, bundle, new
ServiceUnitStateData(Splitting, null, brokerId2, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(brokerId2, owner.get().get());
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Free,
null, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(Optional.empty(), owner.get());
+
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted,
null, brokerId1, 1));
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertTrue(owner.isCompletedExceptionally());
+
+ overrideTableView(channel1, bundle, null);
+ owner = channel1.getOwnerAsync(bundle);
+ assertTrue(owner.isDone());
+ assertEquals(Optional.empty(), owner.get());
+ }
+
@Test(priority = 16)
public void splitAndRetryFailureTest() throws Exception {
channel1.publishAssignEventAsync(bundle3, brokerId1);
@@ -1775,7 +1833,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
overrideTableView(channel2, serviceUnit, val);
}
- private static void overrideTableView(ServiceUnitStateChannel channel,
String serviceUnit, ServiceUnitStateData val)
+ @Test(enabled = false)
+ public static void overrideTableView(ServiceUnitStateChannel channel,
String serviceUnit, ServiceUnitStateData val)
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
FieldUtils.readField(channel, "tableview", true);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index b9707ea76c3..af14ef97f85 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -324,8 +324,8 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
assertEquals(result.size(), NUM_BROKERS);
}
- @Test(timeOut = 40 * 1000)
- public void testIsolationPolicy() throws PulsarAdminException {
+ @Test(timeOut = 300 * 1000)
+ public void testIsolationPolicy() throws Exception {
final String namespaceIsolationPolicyName = "my-isolation-policy";
final String isolationEnabledNameSpace = DEFAULT_TENANT +
"/my-isolation-policy" + nsSuffix;
Map<String, String> parameters1 = new HashMap<>();
@@ -334,7 +334,8 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
Awaitility.await().atMost(10,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
- List<String> activeBrokers =
admin.brokers().getActiveBrokers();
+ List<String> activeBrokers =
admin.brokers().getActiveBrokersAsync()
+ .get(5, TimeUnit.SECONDS);
assertEquals(activeBrokers.size(), NUM_BROKERS);
}
);
@@ -377,15 +378,16 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
- List<String> activeBrokers =
admin.brokers().getActiveBrokers();
+ List<String> activeBrokers =
admin.brokers().getActiveBrokersAsync()
+ .get(5, TimeUnit.SECONDS);
assertEquals(activeBrokers.size(), 2);
}
);
Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
- String ownerBroker = admin.lookups().lookupTopic(topic);
+ String ownerBroker =
admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});
@@ -396,20 +398,23 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
}
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
- List<String> activeBrokers =
admin.brokers().getActiveBrokers();
+ List<String> activeBrokers =
admin.brokers().getActiveBrokersAsync().get(5, TimeUnit.SECONDS);
assertEquals(activeBrokers.size(), 1);
}
);
- try {
- admin.lookups().lookupTopic(topic);
- fail();
- } catch (Exception ex) {
- log.error("Failed to lookup topic: ", ex);
- assertThat(ex.getMessage()).contains("Failed to select the new
owner broker for bundle");
- }
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
+ () -> {
+ try {
+ admin.lookups().lookupTopicAsync(topic).get(5,
TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ log.error("Failed to lookup topic: ", ex);
+ assertThat(ex.getMessage()).contains("Failed to select
the new owner broker for bundle");
+ }
+ }
+ );
}
private String getBrokerUrl(int index) {