This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d205c93cb9 [fix][test] flaky test 
`testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726)
2d205c93cb9 is described below

commit 2d205c93cb9e1324834e0818de0edc531b66a119
Author: labuladong <[email protected]>
AuthorDate: Wed Dec 7 14:48:44 2022 +0800

    [fix][test] flaky test 
`testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726)
---
 .../client/impl/KeySharedSubscriptionTest.java     | 109 +++++++++------------
 1 file changed, 45 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 8838bd9e3a1..6f7d8d9c3f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -29,14 +29,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.Cleanup;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -77,80 +81,50 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         AtomicLong lastActiveTime = new AtomicLong();
         AtomicBoolean canAcknowledgement = new AtomicBoolean(false);
 
-        @Cleanup
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .consumerName("con-1")
-                .messageListener((cons1, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons1.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                })
-                .subscribe();
-        @Cleanup
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .messageListener((cons2, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons2.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                })
-                .consumerName("con-2")
-                .subscribe();
-        @Cleanup
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .messageListener((cons3, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons3.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
+        List<Consumer<?>> consumerList = new ArrayList<>();
+        // create 3 consumers
+        for (int i = 0; i < 3; i++) {
+            ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName("sub-1")
+                    .subscriptionType(subscriptionType)
+                    .messageListener((consumer, msg) -> {
+                        lastActiveTime.set(System.currentTimeMillis());
+                        nameToId.computeIfAbsent(consumer, (k) -> new 
ArrayList<>())
+                                .add(msg.getMessageId());
+                        recMessages.add(msg.getMessageId());
+                        if (canAcknowledgement.get()) {
+                            try {
+                                consumer.acknowledge(msg);
+                            } catch (PulsarClientException e) {
+                                throw new RuntimeException(e);
+                            }
                         }
-                    }
-                })
-                .consumerName("con-3")
-                .subscribe();
+                    });
+
+            if (subscriptionType == SubscriptionType.Key_Shared) {
+                // ensure every consumer can be distributed messages
+                int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + 
i).getBytes())
+                        % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
+                
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, 
hash)));
+            }
+
+            consumerList.add(builder.subscribe());
+        }
 
-        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
                 // We chose 9 because the maximum unacked message is 10
                 .batchingMaxMessages(9)
+                .batcherBuilder(BatcherBuilder.KEY_BASED)
                 .create();
 
         for (int i = 0; i < totalMsg; i++) {
-            producer.sendAsync(UUID.randomUUID().toString()
-                            .getBytes(StandardCharsets.UTF_8))
-                    .thenAccept(pubMessages::add);
+            byte[] msg = 
UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+            producer.newMessage().key("key-" + (i % 3)).value(msg)
+                    .sendAsync().thenAccept(pubMessages::add);
         }
 
         // Wait for all consumers can not read more messages. the consumers 
are stuck by max unacked messages.
@@ -172,6 +146,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
         // Wait for all consumers to continue receiving messages.
         Awaitility.await()
+                .atMost(15, TimeUnit.SECONDS)
                 .pollDelay(5, TimeUnit.SECONDS)
                 .until(() ->
                         (System.currentTimeMillis() - lastActiveTime.get()) > 
TimeUnit.SECONDS.toMillis(5));
@@ -181,5 +156,11 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         Assert.assertEquals(pubMessages.size(), totalMsg);
         Assert.assertEquals(pubMessages.size(), recMessages.size());
         Assert.assertTrue(recMessages.containsAll(pubMessages));
+
+        // cleanup
+        producer.close();
+        for (Consumer<?> consumer : consumerList) {
+            consumer.close();
+        }
     }
 }

Reply via email to