This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 11f7bcdb4ff [fix][test] Fix flaky
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
11f7bcdb4ff is described below
commit 11f7bcdb4ff7fd10d1b278626609c7bd07cc9c2f
Author: Oneby Wang <[email protected]>
AuthorDate: Fri May 22 22:10:26 2026 +0800
[fix][test] Fix flaky
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
---
.../pulsar/broker/service/ResendRequestTest.java | 59 ++++++++++++----------
1 file changed, 31 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 4292b9236c8..4f31d4c8701 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@@ -492,37 +493,41 @@ public class ResendRequestTest extends
SharedPulsarBaseTest {
Random rn = new Random();
// 1. producer connect
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
-
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
-
- // 2. Create consumer
- Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
-
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient();
- Consumer<byte[]> consumer2 =
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
- // 3. producer publish messages
+ // 2. producer publish messages
for (int i = 0; i < totalMessages; i++) {
String message = messagePredicate + i;
log.info().attr("message", message).log("Message produced");
producer.send(message.getBytes());
}
+ // 3. Create consumer
+ @Cleanup
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+ SubscriptionInitialPosition.Earliest).subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient();
+ Consumer<byte[]> consumer2 =
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+ SubscriptionInitialPosition.Earliest).subscribe();
+
// 4. Receive messages
- // Use timeouts on the initial receives — with a Shared subscription
the broker may
- // dispatch all messages to a single consumer (receiverQueueSize is
larger than the
- // number of messages per partition), and a blocking receive() would
hang.
- Message<byte[]> message1 = consumer1.receive(5000,
TimeUnit.MILLISECONDS);
- Message<byte[]> message2 = consumer2.receive(5000,
TimeUnit.MILLISECONDS);
int messageCount1 = 0;
int messageCount2 = 0;
int ackCount1 = 0;
int ackCount2 = 0;
- do {
+ while (true) {
+ Message<byte[]> message1 = consumer1.receive(500,
TimeUnit.MILLISECONDS);
+ Message<byte[]> message2 = consumer2.receive(500,
TimeUnit.MILLISECONDS);
+ if (message1 == null && message2 == null) {
+ break;
+ }
if (message1 != null) {
log.info().attr("data", new
String(message1.getData())).log("Consumer1 received");
messageCount1 += 1;
@@ -541,9 +546,7 @@ public class ResendRequestTest extends SharedPulsarBaseTest
{
ackCount2 += 1;
}
}
- message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
+ }
log.info().attr("messageCount1", messageCount1).log("messageCount1");
log.info().attr("messageCount2", messageCount2).log("messageCount2");
log.info().attr("ackCount1", ackCount1).log("ackCount1");
@@ -558,10 +561,13 @@ public class ResendRequestTest extends
SharedPulsarBaseTest {
}
// 6. Check if Messages redelivered again
- message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
messageCount1 = 0;
- do {
+ while (true) {
+ Message<byte[]> message1 = consumer1.receive(500,
TimeUnit.MILLISECONDS);
+ Message<byte[]> message2 = consumer2.receive(500,
TimeUnit.MILLISECONDS);
+ if (message1 == null && message2 == null) {
+ break;
+ }
if (message1 != null) {
log.info().attr("data", new
String(message1.getData())).log("Consumer1 received");
messageCount1 += 1;
@@ -570,10 +576,7 @@ public class ResendRequestTest extends
SharedPulsarBaseTest {
log.info().attr("data", new
String(message2.getData())).log("Consumer2 received");
messageCount2 += 1;
}
- message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS);
- message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
- } while (message1 != null || message2 != null);
-
+ }
log.info().attr("messageCount1", messageCount1).log("messageCount1");
log.info().attr("messageCount2", messageCount2).log("messageCount2");
log.info().attr("ackCount1", ackCount1).log("ackCount1");