This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2d205c93cb9 [fix][test] flaky test
`testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726)
2d205c93cb9 is described below
commit 2d205c93cb9e1324834e0818de0edc531b66a119
Author: labuladong <[email protected]>
AuthorDate: Wed Dec 7 14:48:44 2022 +0800
[fix][test] flaky test
`testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726)
---
.../client/impl/KeySharedSubscriptionTest.java | 109 +++++++++------------
1 file changed, 45 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 8838bd9e3a1..6f7d8d9c3f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -29,14 +29,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -77,80 +81,50 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
AtomicLong lastActiveTime = new AtomicLong();
AtomicBoolean canAcknowledgement = new AtomicBoolean(false);
- @Cleanup
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("sub-1")
- .subscriptionType(subscriptionType)
- .consumerName("con-1")
- .messageListener((cons1, msg) -> {
- lastActiveTime.set(System.currentTimeMillis());
- nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
- .add(msg.getMessageId());
- recMessages.add(msg.getMessageId());
- if (canAcknowledgement.get()) {
- try {
- cons1.acknowledge(msg);
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- }
- }
- })
- .subscribe();
- @Cleanup
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("sub-1")
- .subscriptionType(subscriptionType)
- .messageListener((cons2, msg) -> {
- lastActiveTime.set(System.currentTimeMillis());
- nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
- .add(msg.getMessageId());
- recMessages.add(msg.getMessageId());
- if (canAcknowledgement.get()) {
- try {
- cons2.acknowledge(msg);
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- }
- }
- })
- .consumerName("con-2")
- .subscribe();
- @Cleanup
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("sub-1")
- .subscriptionType(subscriptionType)
- .messageListener((cons3, msg) -> {
- lastActiveTime.set(System.currentTimeMillis());
- nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
- .add(msg.getMessageId());
- recMessages.add(msg.getMessageId());
- if (canAcknowledgement.get()) {
- try {
- cons3.acknowledge(msg);
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
+ List<Consumer<?>> consumerList = new ArrayList<>();
+ // create 3 consumers
+ for (int i = 0; i < 3; i++) {
+ ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub-1")
+ .subscriptionType(subscriptionType)
+ .messageListener((consumer, msg) -> {
+ lastActiveTime.set(System.currentTimeMillis());
+ nameToId.computeIfAbsent(consumer, (k) -> new
ArrayList<>())
+ .add(msg.getMessageId());
+ recMessages.add(msg.getMessageId());
+ if (canAcknowledgement.get()) {
+ try {
+ consumer.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- })
- .consumerName("con-3")
- .subscribe();
+ });
+
+ if (subscriptionType == SubscriptionType.Key_Shared) {
+ // ensure every consumer can be distributed messages
+ int hash = Murmur3_32Hash.getInstance().makeHash(("key-" +
i).getBytes())
+ % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
+
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash,
hash)));
+ }
+
+ consumerList.add(builder.subscribe());
+ }
- @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
// We chose 9 because the maximum unacked message is 10
.batchingMaxMessages(9)
+ .batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
for (int i = 0; i < totalMsg; i++) {
- producer.sendAsync(UUID.randomUUID().toString()
- .getBytes(StandardCharsets.UTF_8))
- .thenAccept(pubMessages::add);
+ byte[] msg =
UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+ producer.newMessage().key("key-" + (i % 3)).value(msg)
+ .sendAsync().thenAccept(pubMessages::add);
}
// Wait for all consumers can not read more messages. the consumers
are stuck by max unacked messages.
@@ -172,6 +146,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
// Wait for all consumers to continue receiving messages.
Awaitility.await()
+ .atMost(15, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) >
TimeUnit.SECONDS.toMillis(5));
@@ -181,5 +156,11 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
Assert.assertEquals(pubMessages.size(), totalMsg);
Assert.assertEquals(pubMessages.size(), recMessages.size());
Assert.assertTrue(recMessages.containsAll(pubMessages));
+
+ // cleanup
+ producer.close();
+ for (Consumer<?> consumer : consumerList) {
+ consumer.close();
+ }
}
}