equanz commented on issue #23307:
URL: https://github.com/apache/pulsar/issues/23307#issuecomment-2355133584

   Further investigation may be needed, but I think this is an issue related to 
redelivery.
   
   I added the following patches.
   ```diff
   diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   index b0717f1326..3ae52d71f0 100644
   --- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   +++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   @@ -2348,7 +2348,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
   
            // Adding a new consumer.
            @Cleanup
   -        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
   +        ConsumerImpl<Integer> c1 = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c1")
                    .subscriptionName(subscriptionName)
   @@ -2358,7 +2358,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                    .subscribe();
   
            @Cleanup
   -        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
   +        ConsumerImpl<Integer> c2 = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c2")
                    .subscriptionName(subscriptionName)
   @@ -2367,7 +2367,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                    .subscribe();
   
            @Cleanup
   -        Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
   +        ConsumerImpl<Integer> c3 = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c3")
                    .subscriptionName(subscriptionName)
   @@ -2417,8 +2417,10 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
            c1.resume();
            c3.resume();
            Thread.sleep(pauseTime);
   +        log.info("c1 incomingMessages: {}", 
c1.incomingMessages.toList().stream().map(Message::getMessageId).toList());
   +        log.info("c3 incomingMessages: {}", 
c3.incomingMessages.toList().stream().map(Message::getMessageId).toList());
            // reconnect c2
   -        c2 = pulsarClient.newConsumer(Schema.INT32)
   +        c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c2")
                    .subscriptionName(subscriptionName)
   @@ -2428,7 +2430,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
            // close and reconnect c1
            c1.close();
            Thread.sleep(pauseTime);
   -        c1 = pulsarClient.newConsumer(Schema.INT32)
   +        c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c1")
                    .subscriptionName(subscriptionName)
   @@ -2438,7 +2440,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
            // close and reconnect c3
            c3.close();
            Thread.sleep(pauseTime);
   -        c3 = pulsarClient.newConsumer(Schema.INT32)
   +        c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
                    .topic(topic)
                    .consumerName("c3")
                    .subscriptionName(subscriptionName)
   @@ -2450,6 +2452,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                    + dispatcher.getNumberOfMessagesInReplay());
   
            logTopicStats(topic);
   +        log.info("c2 incomingMessages: {}", 
c2.incomingMessages.toList().stream().map(Message::getMessageId).toList());
   
            // produce more messages
            for (int i = 1000; i < 2000; i++) {
   diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
   index 9748a42f0c..a5d5cd7772 100644
   --- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
   +++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
   @@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
        protected final ExecutorService externalPinnedExecutor;
        protected final ExecutorService internalPinnedExecutor;
        protected UnAckedMessageTracker unAckedMessageTracker;
   -    final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
   +    public final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
        protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
        protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
        protected final int maxReceiverQueueSize;
   ```
   
   And I ran the test, then failed as follows.
   ```
   2024-09-17T18:15:31,906 - INFO  - 
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c1 
incomingMessages: [8:5:-1, 8:6:-1, 8:7:-1, 8:11:-1, 8:13:-1, 8:17:-1, 8:500:-1, 
8:511:-1, 8:549:-1, 8:559:-1]
   2024-09-17T18:15:31,906 - INFO  - 
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c3 
incomingMessages: [8:0:-1, 8:1:-1, 8:2:-1, 8:3:-1, 8:4:-1, 8:8:-1, 8:9:-1, 
8:10:-1, 8:12:-1, 8:14:-1]
   ...
   2024-09-17T18:15:32,186 - INFO  - 
[TestNG-method=testOrderingAfterReconnects-1:MockedPulsarServiceBaseTest] - 
[testOrderingAfterReconnects-40ecbc0c-7f8f-4973-9513-c3bcadd0fb5b] stats: {
   ...
         "consumersAfterMarkDeletePosition" : {
           "consumerName=c2, consumerId=11, address=/127.0.0.1:63188" : "8:500",
           "consumerName=c1, consumerId=12, address=/127.0.0.1:63188" : "8:500",
           "consumerName=c3, consumerId=13, address=/127.0.0.1:63188" : "8:500"
         },
         "lastSentPosition" : "8:500",
         "individuallySentPositions" : 
"[(8:510..8:511],(8:548..8:549],(8:558..8:559]]",
   ...
   2024-09-17T18:15:32,192 - INFO  - 
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c2 
incomingMessages: [8:15:-1, 8:16:-1, 8:18:-1, 8:19:-1, 8:20:-1, 8:21:-1, 
8:22:-1, 8:23:-1, 8:24:-1, 8:25:-1]
   2024-09-17T18:15:33,860 - ERROR - 
[broker-test-util-executor-2-1:KeySharedSubscriptionTest] - key: 397 value: 2 
prev: 8:21/c2 current: 8:2/c2
   ```
   
   When the c2 is reconnected, the dispatcher sends replay messages to the c2. 
After the c3 is disconnected, `8:2` will be resent to the c2.
   
   (I didn't address redelivery messages in the PIP-282 because I think we 
can't fully address redelivery messages' ordering. For example, the dispatcher 
can't control the client-side redelivery operation such as unack `x:101` before 
`x:100`. cf. https://github.com/apache/pulsar/pull/21953#discussion_r1519261144 
)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to