This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be4bcac11ae [fix] [broker] fix flaky test 
PatternTopicsConsumerImplTest (#21222)
be4bcac11ae is described below

commit be4bcac11ae76cdf3d4c4b0639735f3309919e4c
Author: fengyubiao <[email protected]>
AuthorDate: Sat Sep 23 22:15:23 2023 +0800

    [fix] [broker] fix flaky test PatternTopicsConsumerImplTest (#21222)
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index c8f7b721cce..451f93067b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -67,6 +68,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         isTcpLookup = true;
         // enabled transaction, to test pattern consumers not subscribe to 
transaction system topic.
         conf.setTransactionCoordinatorEnabled(true);
+        conf.setSubscriptionPatternMaxLength(10000);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -210,6 +212,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .subscribe();
         
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
@@ -287,6 +295,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
                 .subscribe();
         
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
@@ -364,6 +378,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
@@ -455,6 +475,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
         List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
@@ -525,6 +551,11 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
 
         // 3. verify consumer get methods, to get 5 number of partitions and 
topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
@@ -605,6 +636,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
                 .receiverQueueSize(4)
                 .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         // 1. create partition
         String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + 
key;
         TenantInfoImpl tenantInfo = createDefaultTenantInfo();
@@ -665,6 +702,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 6 number of partitions and 
topics: 6=1+2+3
@@ -775,6 +818,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 0 number of partitions and 
topics: 6=1+2+3
@@ -861,6 +910,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .subscriptionName("sub")
             .subscribe();
 
+        // Wait topic list watcher creation.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+
         assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
         PatternMultiTopicsConsumerImpl<String> consumerImpl = 
(PatternMultiTopicsConsumerImpl<String>) consumer;
 

Reply via email to