This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a5bf97af4066c99050e1fd4c988ea3854afd6c8
Author: Oneby Wang <[email protected]>
AuthorDate: Fri May 22 22:10:26 2026 +0800

    [fix][test] Fix flaky 
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
    
    (cherry picked from commit 11f7bcdb4ff7fd10d1b278626609c7bd07cc9c2f)
---
 .../pulsar/broker/service/ResendRequestTest.java   | 59 ++++++++++++----------
 1 file changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 7ffec5933d7..97610420190 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@@ -493,37 +494,41 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         Random rn = new Random();
 
         // 1. producer connect
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
-
-        // 2. Create consumer
-        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
-
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient();
-        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        // 3. producer publish messages
+        // 2. producer publish messages
         for (int i = 0; i < totalMessages; i++) {
             String message = messagePredicate + i;
             log.info("Message produced: " + message);
             producer.send(message.getBytes());
         }
 
+        // 3. Create consumer
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient();
+        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
         // 4. Receive messages
-        // Use timeouts on the initial receives — with a Shared subscription 
the broker may
-        // dispatch all messages to a single consumer (receiverQueueSize is 
larger than the
-        // number of messages per partition), and a blocking receive() would 
hang.
-        Message<byte[]> message1 = consumer1.receive(5000, 
TimeUnit.MILLISECONDS);
-        Message<byte[]> message2 = consumer2.receive(5000, 
TimeUnit.MILLISECONDS);
         int messageCount1 = 0;
         int messageCount2 = 0;
         int ackCount1 = 0;
         int ackCount2 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info("Consumer1 received " + new 
String(message1.getData()));
                 messageCount1 += 1;
@@ -542,9 +547,7 @@ public class ResendRequestTest extends SharedPulsarBaseTest 
{
                     ackCount2 += 1;
                 }
             }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+        }
         log.info("messageCount1 = " + messageCount1);
         log.info("messageCount2 = " + messageCount2);
         log.info("ackCount1 = " + ackCount1);
@@ -559,10 +562,13 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         }
 
         // 6. Check if Messages redelivered again
-        message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
-        message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
         messageCount1 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info("Consumer1 received " + new 
String(message1.getData()));
                 messageCount1 += 1;
@@ -571,10 +577,7 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
                 log.info("Consumer2 received " + new 
String(message2.getData()));
                 messageCount2 += 1;
             }
-            message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
-
+        }
         log.info("messageCount1 = " + messageCount1);
         log.info("messageCount2 = " + messageCount2);
         log.info("ackCount1 = " + ackCount1);

Reply via email to