This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3f2ebad22b2 [branch-2.9] Fixed key-shared delivery of messages with 
interleaved delays. (#18108)
3f2ebad22b2 is described below

commit 3f2ebad22b2f439d376d584100651adf90dbf471
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Oct 19 20:02:24 2022 +0800

    [branch-2.9] Fixed key-shared delivery of messages with interleaved delays. 
(#18108)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  2 +-
 .../PersistentDispatcherMultipleConsumers.java     |  6 ++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 51 ++++++++++++----------
 .../service/persistent/DelayedDeliveryTest.java    | 43 ++++++++++++++++++
 4 files changed, 78 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 4a8842b15c1..7e4155c0613 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -266,7 +266,7 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         synchronized (dispatcher) {
             lastTickRun = clock.millis();
             currentTimeoutTarget = -1;
-            timeout = null;
+            this.timeout = null;
             dispatcher.readMoreEntries();
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2fdb03cb194..a36b1eded3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -277,7 +277,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                minReplayedPosition = 
getMessagesToReplayNow(1).stream().findFirst().orElse(null);
+                Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
+                minReplayedPosition = 
toReplay.stream().findFirst().orElse(null);
+                if (minReplayedPosition != null) {
+                    redeliveryMessages.add(minReplayedPosition.getLedgerId(), 
minReplayedPosition.getEntryId());
+                }
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 73eb031d60c..85b2f5163ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -173,29 +173,36 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // This may happen when consumer closed. See issue #12885 for details.
         if (!allowOutOfOrderDelivery) {
             Set<PositionImpl> messagesToReplayNow = 
this.getMessagesToReplayNow(1);
-            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() 
&& this.minReplayedPosition != null) {
-                PositionImpl relayPosition = 
messagesToReplayNow.stream().findFirst().get();
-                // If relayPosition is a new entry wither smaller position is 
inserted for redelivery during this async
-                // read, it is possible that this relayPosition should 
dispatch to consumer first. So in order to
-                // preserver order delivery, we need to discard this read 
result, and try to trigger a replay read,
-                // that containing "relayPosition", by calling readMoreEntries.
-                if (relayPosition.compareTo(minReplayedPosition) < 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Position {} (<{}) is inserted for 
relay during current {} read, discard this "
-                                + "read and retry with readMoreEntries.",
-                                name, relayPosition, minReplayedPosition, 
readType);
-                    }
-                    if (readType == ReadType.Normal) {
-                        entries.forEach(entry -> {
-                            long stickyKeyHash = getStickyKeyHash(entry);
-                            addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
-                            entry.release();
-                        });
-                    } else if (readType == ReadType.Replay) {
-                        entries.forEach(Entry::release);
+            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) 
{
+                PositionImpl replayPosition = 
messagesToReplayNow.stream().findFirst().get();
+                // We have received a message potentially from the delayed 
tracker and, since we're not using it
+                // right now, it needs to be added to the redelivery tracker 
or we won't attempt anymore to
+                // resend it (until we disconnect consumer).
+                redeliveryMessages.add(replayPosition.getLedgerId(), 
replayPosition.getEntryId());
+
+                if (this.minReplayedPosition != null) {
+                    // If relayPosition is a new entry wither smaller position 
is inserted for redelivery during this
+                    // async read, it is possible that this relayPosition 
should dispatch to consumer first. So in
+                    // order to preserver order delivery, we need to discard 
this read result, and try to trigger a
+                    // replay read, that containing "relayPosition", by 
calling readMoreEntries.
+                    if (replayPosition.compareTo(minReplayedPosition) < 0) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Position {} (<{}) is inserted for 
relay during current {} read, "
+                                            + "discard this read and retry 
with readMoreEntries.",
+                                    name, replayPosition, minReplayedPosition, 
readType);
+                        }
+                        if (readType == ReadType.Normal) {
+                            entries.forEach(entry -> {
+                                long stickyKeyHash = getStickyKeyHash(entry);
+                                addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
+                                entry.release();
+                            });
+                        } else if (readType == ReadType.Replay) {
+                            entries.forEach(Entry::release);
+                        }
+                        readMoreEntries();
+                        return;
                     }
-                    readMoreEntries();
-                    return;
                 }
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 041818850ed..fc94f4d72c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -542,6 +543,48 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
     }
 
+    @Test
+    public void testInterleavedMessagesOnKeySharedSubscription() throws 
Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("testInterleavedMessagesOnKeySharedSubscription");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("key-shared-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Random random = new Random(0);
+        for (int i = 0; i < 10; i++) {
+            // Publish 1 message without delay and 1 with delay
+            producer.newMessage()
+                    .value("immediate-msg-" + i)
+                    .sendAsync();
+
+            int delayMillis = 1000 + random.nextInt(1000);
+            producer.newMessage()
+                    .value("delayed-msg-" + i)
+                    .deliverAfter(delayMillis, TimeUnit.MILLISECONDS)
+                    .sendAsync();
+            Thread.sleep(1000);
+        }
+
+        producer.flush();
+
+        Set<String> receivedMessages = new HashSet<>();
+
+        while (receivedMessages.size() < 20) {
+            Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            receivedMessages.add(msg.getValue());
+            consumer.acknowledge(msg);
+        }
+    }
+
     @Test
     public void testDispatcherReadFailure() throws Exception {
         String topic = 
BrokerTestUtil.newUniqueName("testDispatcherReadFailure");

Reply via email to