This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 469ce7e1048 [fix][test] Fix ExtensibleLoadManagerImplTest flaky test 
(#21479)
469ce7e1048 is described below

commit 469ce7e10483a8e8fece91dfe2fc90929277e2c0
Author: Kai Wang <[email protected]>
AuthorDate: Tue Nov 7 08:02:13 2023 -0600

    [fix][test] Fix ExtensibleLoadManagerImplTest flaky test (#21479)
---
 .../extensions/ExtensibleLoadManagerImplTest.java  | 67 +++++++++++++++-------
 1 file changed, 45 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 158488bc84a..fb49071fa47 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -71,6 +71,7 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -105,6 +106,7 @@ import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerAssignment;
@@ -194,7 +196,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             
admin.namespaces().setNamespaceReplicationClusters("public/default",
                     Sets.newHashSet(this.conf.getClusterName()));
 
-            admin.namespaces().createNamespace(defaultTestNamespace);
+            admin.namespaces().createNamespace(defaultTestNamespace, 128);
             
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
                     Sets.newHashSet(this.conf.getClusterName()));
             lookupService = (LookupService) 
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
@@ -237,8 +239,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testAssign() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-assign");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-assign");
+        TopicName topicName = topicAndBundle.getLeft();
+        NamespaceBundle bundle = topicAndBundle.getRight();
         Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
         assertTrue(brokerLookupData.isPresent());
         log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
@@ -262,8 +265,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCheckOwnershipAsync() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-check-ownership");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-check-ownership");
+        NamespaceBundle bundle = topicAndBundle.getRight();
         // 1. The bundle is never assigned.
         retryStrategically((test) -> {
             try {
@@ -291,8 +294,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testFilter() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-filter");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-filter");
+        NamespaceBundle bundle = topicAndBundle.getRight();
 
         doReturn(List.of(new BrokerFilter() {
             @Override
@@ -317,8 +320,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testFilterHasException() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-filter-has-exception");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
+        NamespaceBundle bundle = topicAndBundle.getRight();
 
         doReturn(List.of(new MockBrokerFilter() {
             @Override
@@ -336,8 +339,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 30 * 1000)
     public void testUnloadAdminAPI() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-unload");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-unload");
+        TopicName topicName = topicAndBundle.getLeft();
+        NamespaceBundle bundle = topicAndBundle.getRight();
 
         AtomicInteger onloadCount = new AtomicInteger(0);
         AtomicInteger unloadCount = new AtomicInteger(0);
@@ -534,8 +538,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @Test(timeOut = 30 * 1000)
     public void testSplitBundleAdminAPI() throws Exception {
         String namespace = defaultTestNamespace;
-        String topic = "persistent://" + namespace + "/test-split";
-        admin.topics().createPartitionedTopic(topic, 10);
+        Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-split");
+        TopicName topicName = topicAndBundle.getLeft();
+        admin.topics().createPartitionedTopic(topicName.toString(), 10);
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         int numBundles = bundles.getNumBundles();
         var bundleRanges = 
bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
@@ -586,7 +591,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception 
{
         String namespace = defaultTestNamespace;
         String topic = "persistent://" + namespace + 
"/test-split-with-specific-position";
-        admin.topics().createPartitionedTopic(topic, 10);
+        admin.topics().createPartitionedTopic(topic, 1024);
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         int numBundles = bundles.getNumBundles();
 
@@ -664,9 +669,11 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     public void testMoreThenOneFilter() throws Exception {
         // Use a different namespace to avoid flaky test failures
         // from unloading the default namespace and the following topic policy 
lookups at the init state step
-        String namespace = "public/my-namespace";
-        TopicName topicName = TopicName.get(namespace + 
"/test-filter-has-exception");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Pair<TopicName, NamespaceBundle> topicAndBundle =
+                
getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
+        TopicName topicName = topicAndBundle.getLeft();
+        NamespaceBundle bundle = topicAndBundle.getRight();
+
         String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
         doReturn(List.of(new MockBrokerFilter() {
             @Override
@@ -684,7 +691,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 return FutureUtil.failedFuture(new 
BrokerFilterException("Test"));
             }
         })).when(primaryLoadManager).getBrokerFilterPipeline();
-        admin.namespaces().createNamespace(namespace);
         Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
         Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
             assertTrue(brokerLookupData.isPresent());
@@ -694,8 +700,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
                     
pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
         });
-
-        admin.namespaces().deleteNamespace(namespace, true);
     }
 
     @Test
@@ -718,7 +722,11 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
                 // start pulsar3 with old load manager
                 var pulsar3 = additionalPulsarTestContext.getPulsarService();
-                String topic = "persistent://" + defaultTestNamespace + 
"/test";
+                Pair<TopicName, NamespaceBundle> topicAndBundle =
+                        
getBundleIsNotOwnByChangeEventTopic("testDeployAndRollbackLoadManager");
+                TopicName topicName = topicAndBundle.getLeft();
+                NamespaceBundle bundle = topicAndBundle.getRight();
+                String topic = topicName.toString();
 
                 String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
                 assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
@@ -728,7 +736,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 assertEquals(lookupResult1, lookupResult2);
                 assertEquals(lookupResult1, lookupResult3);
 
-                NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get(topic)).get();
                 LookupOptions options = LookupOptions.builder()
                         .authoritative(false)
                         .requestHttps(false)
@@ -1400,4 +1407,20 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
         return pulsar.getNamespaceService().getBundleAsync(topic);
     }
+
+    private Pair<TopicName, NamespaceBundle> 
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+            throws Exception {
+        TopicName changeEventsTopicName =
+                TopicName.get(defaultTestNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, 
changeEventsTopicName).get();
+        int i = 0;
+        while(true) {
+            TopicName topicName = TopicName.get(defaultTestNamespace + "/" + 
topicNamePrefix + "-" + i);
+            NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+            if (!bundle.equals(changeEventsBundle)) {
+                return Pair.of(topicName, bundle);
+            }
+            i++;
+        }
+    }
 }

Reply via email to