This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 469ce7e1048 [fix][test] Fix ExtensibleLoadManagerImplTest flaky test
(#21479)
469ce7e1048 is described below
commit 469ce7e10483a8e8fece91dfe2fc90929277e2c0
Author: Kai Wang <[email protected]>
AuthorDate: Tue Nov 7 08:02:13 2023 -0600
[fix][test] Fix ExtensibleLoadManagerImplTest flaky test (#21479)
---
.../extensions/ExtensibleLoadManagerImplTest.java | 67 +++++++++++++++-------
1 file changed, 45 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 158488bc84a..fb49071fa47 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -71,6 +71,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -105,6 +106,7 @@ import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
@@ -194,7 +196,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));
- admin.namespaces().createNamespace(defaultTestNamespace);
+ admin.namespaces().createNamespace(defaultTestNamespace, 128);
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));
lookupService = (LookupService)
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
@@ -237,8 +239,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testAssign() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace +
"/test-assign");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-assign");
+ TopicName topicName = topicAndBundle.getLeft();
+ NamespaceBundle bundle = topicAndBundle.getRight();
Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
assertTrue(brokerLookupData.isPresent());
log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
@@ -262,8 +265,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCheckOwnershipAsync() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace +
"/test-check-ownership");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-check-ownership");
+ NamespaceBundle bundle = topicAndBundle.getRight();
// 1. The bundle is never assigned.
retryStrategically((test) -> {
try {
@@ -291,8 +294,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFilter() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace +
"/test-filter");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-filter");
+ NamespaceBundle bundle = topicAndBundle.getRight();
doReturn(List.of(new BrokerFilter() {
@Override
@@ -317,8 +320,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFilterHasException() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace +
"/test-filter-has-exception");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
+ NamespaceBundle bundle = topicAndBundle.getRight();
doReturn(List.of(new MockBrokerFilter() {
@Override
@@ -336,8 +339,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace +
"/test-unload");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-unload");
+ TopicName topicName = topicAndBundle.getLeft();
+ NamespaceBundle bundle = topicAndBundle.getRight();
AtomicInteger onloadCount = new AtomicInteger(0);
AtomicInteger unloadCount = new AtomicInteger(0);
@@ -534,8 +538,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
- String topic = "persistent://" + namespace + "/test-split";
- admin.topics().createPartitionedTopic(topic, 10);
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-split");
+ TopicName topicName = topicAndBundle.getLeft();
+ admin.topics().createPartitionedTopic(topicName.toString(), 10);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();
var bundleRanges =
bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
@@ -586,7 +591,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception
{
String namespace = defaultTestNamespace;
String topic = "persistent://" + namespace +
"/test-split-with-specific-position";
- admin.topics().createPartitionedTopic(topic, 10);
+ admin.topics().createPartitionedTopic(topic, 1024);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();
@@ -664,9 +669,11 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
public void testMoreThenOneFilter() throws Exception {
// Use a different namespace to avoid flaky test failures
// from unloading the default namespace and the following topic policy
lookups at the init state step
- String namespace = "public/my-namespace";
- TopicName topicName = TopicName.get(namespace +
"/test-filter-has-exception");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
+
getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
+ TopicName topicName = topicAndBundle.getLeft();
+ NamespaceBundle bundle = topicAndBundle.getRight();
+
String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
@@ -684,7 +691,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
return FutureUtil.failedFuture(new
BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();
- admin.namespaces().createNamespace(namespace);
Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(brokerLookupData.isPresent());
@@ -694,8 +700,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
});
-
- admin.namespaces().deleteNamespace(namespace, true);
}
@Test
@@ -718,7 +722,11 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
// start pulsar3 with old load manager
var pulsar3 = additionalPulsarTestContext.getPulsarService();
- String topic = "persistent://" + defaultTestNamespace +
"/test";
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
+
getBundleIsNotOwnByChangeEventTopic("testDeployAndRollbackLoadManager");
+ TopicName topicName = topicAndBundle.getLeft();
+ NamespaceBundle bundle = topicAndBundle.getRight();
+ String topic = topicName.toString();
String lookupResult1 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
@@ -728,7 +736,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);
- NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
@@ -1400,4 +1407,20 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}
+
+ private Pair<TopicName, NamespaceBundle>
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+ throws Exception {
+ TopicName changeEventsTopicName =
+ TopicName.get(defaultTestNamespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1,
changeEventsTopicName).get();
+ int i = 0;
+ while(true) {
+ TopicName topicName = TopicName.get(defaultTestNamespace + "/" +
topicNamePrefix + "-" + i);
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!bundle.equals(changeEventsBundle)) {
+ return Pair.of(topicName, bundle);
+ }
+ i++;
+ }
+ }
}