equanz commented on issue #23307:
URL: https://github.com/apache/pulsar/issues/23307#issuecomment-2355133584
Further investigation may be needed, but I think this is an issue related to
redelivery.
I added the following patches.
```diff
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b0717f1326..3ae52d71f0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -2348,7 +2348,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
// Adding a new consumer.
@Cleanup
- Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c1 = (ConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c1")
.subscriptionName(subscriptionName)
@@ -2358,7 +2358,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c2 = (ConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c2")
.subscriptionName(subscriptionName)
@@ -2367,7 +2367,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- Consumer<Integer> c3 = pulsarClient.newConsumer(Schema.INT32)
+ ConsumerImpl<Integer> c3 = (ConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c3")
.subscriptionName(subscriptionName)
@@ -2417,8 +2417,10 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
c1.resume();
c3.resume();
Thread.sleep(pauseTime);
+ log.info("c1 incomingMessages: {}",
c1.incomingMessages.toList().stream().map(Message::getMessageId).toList());
+ log.info("c3 incomingMessages: {}",
c3.incomingMessages.toList().stream().map(Message::getMessageId).toList());
// reconnect c2
- c2 = pulsarClient.newConsumer(Schema.INT32)
+ c2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c2")
.subscriptionName(subscriptionName)
@@ -2428,7 +2430,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
// close and reconnect c1
c1.close();
Thread.sleep(pauseTime);
- c1 = pulsarClient.newConsumer(Schema.INT32)
+ c1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c1")
.subscriptionName(subscriptionName)
@@ -2438,7 +2440,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
// close and reconnect c3
c3.close();
Thread.sleep(pauseTime);
- c3 = pulsarClient.newConsumer(Schema.INT32)
+ c3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.consumerName("c3")
.subscriptionName(subscriptionName)
@@ -2450,6 +2452,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
+ dispatcher.getNumberOfMessagesInReplay());
logTopicStats(topic);
+ log.info("c2 incomingMessages: {}",
c2.incomingMessages.toList().stream().map(Message::getMessageId).toList());
// produce more messages
for (int i = 1000; i < 2000; i++) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0c..a5d5cd7772 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -87,7 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
- final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
+ public final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
protected final int maxReceiverQueueSize;
```
And I ran the test, then failed as follows.
```
2024-09-17T18:15:31,906 - INFO -
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c1
incomingMessages: [8:5:-1, 8:6:-1, 8:7:-1, 8:11:-1, 8:13:-1, 8:17:-1, 8:500:-1,
8:511:-1, 8:549:-1, 8:559:-1]
2024-09-17T18:15:31,906 - INFO -
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c3
incomingMessages: [8:0:-1, 8:1:-1, 8:2:-1, 8:3:-1, 8:4:-1, 8:8:-1, 8:9:-1,
8:10:-1, 8:12:-1, 8:14:-1]
...
2024-09-17T18:15:32,186 - INFO -
[TestNG-method=testOrderingAfterReconnects-1:MockedPulsarServiceBaseTest] -
[testOrderingAfterReconnects-40ecbc0c-7f8f-4973-9513-c3bcadd0fb5b] stats: {
...
"consumersAfterMarkDeletePosition" : {
"consumerName=c2, consumerId=11, address=/127.0.0.1:63188" : "8:500",
"consumerName=c1, consumerId=12, address=/127.0.0.1:63188" : "8:500",
"consumerName=c3, consumerId=13, address=/127.0.0.1:63188" : "8:500"
},
"lastSentPosition" : "8:500",
"individuallySentPositions" :
"[(8:510..8:511],(8:548..8:549],(8:558..8:559]]",
...
2024-09-17T18:15:32,192 - INFO -
[TestNG-method=testOrderingAfterReconnects-1:KeySharedSubscriptionTest] - c2
incomingMessages: [8:15:-1, 8:16:-1, 8:18:-1, 8:19:-1, 8:20:-1, 8:21:-1,
8:22:-1, 8:23:-1, 8:24:-1, 8:25:-1]
2024-09-17T18:15:33,860 - ERROR -
[broker-test-util-executor-2-1:KeySharedSubscriptionTest] - key: 397 value: 2
prev: 8:21/c2 current: 8:2/c2
```
When the c2 is reconnected, the dispatcher sends replay messages to the c2.
After the c3 is disconnected, `8:2` will be resent to the c2.
(I didn't address redelivery messages in the PIP-282 because I think we
can't fully address redelivery messages' ordering. For example, the dispatcher
can't control the client-side redelivery operation such as unack `x:101` before
`x:100`. cf. https://github.com/apache/pulsar/pull/21953#discussion_r1519261144
)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]