This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0d383ba  [broker] Fix issue that message ordering could be broken when 
redelivering messages on Key_Shared subscription (#10762)
0d383ba is described below

commit 0d383ba91636554c67bf0fc5edef8eb22afad77f
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Jul 15 20:44:54 2021 +0900

    [broker] Fix issue that message ordering could be broken when redelivering 
messages on Key_Shared subscription (#10762)
    
    Messages with the same key can be out of order if message redelivery occurs 
on a Key_Shared subscription.
    
    1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` 
contains message-1 and message-2. Message-1 will be delivered to consumer-a and 
message-2 will be delivered to consumer-b.
    2. The dispatcher tried to send message-1 to consumer-a, but the consumer 
was too slow to send it.
    3. Consumer-a is added to `stuckConsumers`.
    
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
    4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` 
will return an empty Set because `isDispatcherStuckOnReplays` is true.
    
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
    5. The dispatcher reads newer messages instead of the messages contained in 
`messagesToRedeliver`.
    
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
    6. A new message (message-3) is delivered to consumer-b.
    7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
    8. As a result, the order of message-2 and message-3 is reversed.
    
    When adding a message to be redeliver to `messagesToRedeliver`, save the 
hash of the key that the message has. If the dispatcher attempts to send newer 
messages to the consumer that have a key corresponding to any one of the saved 
hash values, they will be added to `messagesToRedeliver` instead of being sent. 
This prevents messages with the same key from being out of order.
    
    (cherry picked from commit 5aee5998f6b9e500873e1290f85a21eb11d1cd63)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java |   4 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  18 +-
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |   4 +-
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |   9 +-
 .../broker/service/StickyKeyConsumerSelector.java  |  17 +-
 .../persistent/MessageRedeliveryController.java    | 120 ++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     |  98 +++++-----
 ...istentStickyKeyDispatcherMultipleConsumers.java |  29 ++-
 .../broker/service/PersistentTopicE2ETest.java     |  21 +-
 .../MessageRedeliveryControllerTest.java           | 213 +++++++++++++++++++++
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 152 ++++++++++++++-
 .../apache/pulsar/common/protocol/Commands.java    |  18 ++
 12 files changed, 614 insertions(+), 89 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index 96029d8..e280048 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -105,9 +105,7 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
     }
 
     @Override
-    public Consumer select(byte[] stickyKey) {
-        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
-
+    public Consumer select(int hash) {
         rwLock.readLock().lock();
         try {
             if (hashRing.isEmpty()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 475fdc9..b6557f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -54,6 +54,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -236,7 +237,8 @@ public class Consumer {
                 Entry entry = entries.get(i);
                 if (entry != null) {
                     int batchSize = batchSizes.getBatchSize(i);
-                    pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), 
batchSize, 0);
+                    int stickyKeyHash = getStickyKeyHash(entry);
+                    pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), 
batchSize, stickyKeyHash);
                     if (log.isDebugEnabled()){
                         log.debug("[{}-{}] Added {}:{} ledger entry with 
batchSize of {} to pendingAcks in"
                                         + " broker.service.Consumer for 
consumerId: {}",
@@ -729,7 +731,7 @@ public class Consumer {
         if (pendingAcks != null) {
             List<PositionImpl> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
-            pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
+            pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) 
-> {
                 totalRedeliveryMessages.add((int) batchSize);
                 pendingPositions.add(new PositionImpl(ledgerId, entryId));
             });
@@ -752,10 +754,11 @@ public class Consumer {
         List<PositionImpl> pendingPositions = Lists.newArrayList();
         for (MessageIdData msg : messageIds) {
             PositionImpl position = PositionImpl.get(msg.getLedgerId(), 
msg.getEntryId());
-            LongPair batchSize = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
-            if (batchSize != null) {
+            LongPair longPair = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
+            if (longPair != null) {
+                long batchSize = longPair.first;
                 pendingAcks.remove(position.getLedgerId(), 
position.getEntryId());
-                totalRedeliveryMessages += batchSize.first;
+                totalRedeliveryMessages += batchSize;
                 pendingPositions.add(position);
             }
         }
@@ -810,5 +813,10 @@ public class Consumer {
         return cnx;
     }
 
+    private int getStickyKeyHash(Entry entry) {
+        byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), 
topicName, subscription.getName());
+        return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index f74165e..b715c51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service;
 
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -104,8 +103,7 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector 
implements StickyKeyCon
     }
 
     @Override
-    public Consumer select(byte[] stickyKey) {
-        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
+    public Consumer select(int hash) {
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             return rangeMap.ceilingEntry(slot).getValue();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 75cb6b5..d8aea2e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service;
 
 import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -66,11 +65,6 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
     }
 
     @Override
-    public Consumer select(byte[] stickyKey) {
-        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
-    }
-
-    @Override
     public Map<String, List<String>> getConsumerKeyHashRanges() {
         Map<String, List<String>> result = new HashMap<>();
         Map.Entry<Integer, Consumer> prev = null;
@@ -88,7 +82,8 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
         return result;
     }
 
-    Consumer select(int hash) {
+    @Override
+    public Consumer select(int hash) {
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             Map.Entry<Integer, Consumer> ceilingEntry = 
rangeMap.ceilingEntry(slot);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 0d686f7..5ea6f7f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -22,6 +22,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignExc
 
 import java.util.List;
 import java.util.Map;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 public interface StickyKeyConsumerSelector {
 
@@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector {
      * @param stickyKey sticky key
      * @return consumer
      */
-    Consumer select(byte[] stickyKey);
+    default Consumer select(byte[] stickyKey) {
+        return select(makeStickyKeyHash(stickyKey));
+    }
+
+    static int makeStickyKeyHash(byte[] stickyKey) {
+        return Murmur3_32Hash.getInstance().makeHash(stickyKey);
+    }
+
+    /**
+     * Select a consumer by hash.
+     *
+     * @param hash hash corresponding to sticky key
+     * @return consumer
+     */
+    Consumer select(int hash);
 
     /**
      * Get key hash ranges handled by each consumer
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
new file mode 100644
index 0000000..be14356
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new 
ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            List<LongPair> keysToRemove = new ArrayList<>();
+            hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) 
-> {
+                if (ComparisonChain.start().compare(ledgerId, 
markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                        .result() <= 0) {
+                    keysToRemove.add(new LongPair(ledgerId, entryId));
+                }
+            });
+            keysToRemove.forEach(longPair -> 
hashesToBeBlocked.remove(longPair.first, longPair.second));
+            keysToRemove.clear();
+        }
+        return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
+            return ComparisonChain.start().compare(ledgerId, 
markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                    .result() <= 0;
+        });
+    }
+
+    public boolean isEmpty() {
+        return messagesToRedeliver.isEmpty();
+    }
+
+    public void clear() {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.clear();
+        }
+        messagesToRedeliver.clear();
+    }
+
+    public String toString() {
+        return messagesToRedeliver.toString();
+    }
+
+    public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
+        final AtomicBoolean isContained = new AtomicBoolean(false);
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) 
-> {
+                if (!isContained.get() && stickyKeyHashes.contains((int) 
stickyKeyHash)) {
+                    isContained.set(true);
+                }
+            });
+        }
+        return isContained.get();
+    }
+
+    public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+        if (hashesToBeBlocked != null) {
+            // allowOutOfOrderDelivery is false
+            return messagesToRedeliver.items().stream()
+                    .sorted((l1, l2) -> 
ComparisonChain.start().compare(l1.first, l2.first)
+                            .compare(l1.second, l2.second).result())
+                    .limit(maxMessagesToRead).map(longPair -> new 
PositionImpl(longPair.first, longPair.second))
+                    .collect(Collectors.toCollection(TreeSet::new));
+        } else {
+            // allowOutOfOrderDelivery is true
+            return messagesToRedeliver.items(maxMessagesToRead,
+                    (ledgerId, entryId) -> new PositionImpl(ledgerId, 
entryId));
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6b2b4ec..adc5b2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
-
-import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -57,19 +54,17 @@ import 
org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
-import org.apache.pulsar.common.util.collections.LongPairSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +77,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected volatile Range<PositionImpl> 
lastIndividualDeletedRangeFromCursorRecovery;
 
     private CompletableFuture<Void> closeFuture = null;
-    LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+    protected final MessageRedeliveryController redeliveryMessages;
     protected final RedeliveryTracker redeliveryTracker;
 
     private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = 
Optional.empty();
@@ -109,12 +104,19 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         Normal, Replay
     }
 
-    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, Subscription subscription) {
+    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
+            Subscription subscription) {
+        this(topic, cursor, subscription, true);
+    }
+
+    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, Subscription subscription,
+            boolean allowOutOfOrderDelivery) {
         super(subscription, 
topic.getBrokerService().pulsar().getConfiguration());
         this.cursor = cursor;
         this.lastIndividualDeletedRangeFromCursorRecovery = 
cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
+        this.redeliveryMessages = new 
MessageRedeliveryController(allowOutOfOrderDelivery);
         this.redeliveryTracker = 
this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
                 ? new InMemoryRedeliveryTracker()
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
@@ -137,7 +139,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 cursor.rewind();
                 shouldRewindBeforeReadingOrReplaying = false;
             }
-            messagesToRedeliver.clear();
+            redeliveryMessages.clear();
         }
 
         if (isConsumersExceededOnSubscription()) {
@@ -195,7 +197,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     havePendingRead = false;
                 }
 
-                messagesToRedeliver.clear();
+                redeliveryMessages.clear();
                 redeliveryTracker.clear();
                 if (closeFuture != null) {
                     log.info("[{}] All consumers removed. Subscription is 
disconnected", name);
@@ -206,8 +208,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Consumer are left, reading more entries", 
name);
                 }
-                consumer.getPendingAcks().forEach((ledgerId, entryId, 
batchSize, none) -> {
-                    if (addMessageToReplay(ledgerId, entryId)) {
+                consumer.getPendingAcks().forEach((ledgerId, entryId, 
batchSize, stickyKeyHash) -> {
+                    if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
                         
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
                     }
                 });
@@ -333,9 +335,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                         asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
 
-                deletedMessages.forEach(position -> 
messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
+                deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
                         ((PositionImpl) position).getEntryId()));
-                // if all the entries are acked-entries and cleared up from 
messagesToRedeliver, try to read
+                // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
                 // next entries as readCompletedEntries-callback was never 
called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
                     havePendingReplayRead = false;
@@ -516,7 +518,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 // remove positions first from replay list first : 
sendMessages recycles entries
                 if (readType == ReadType.Replay) {
                     entries.subList(start, start + messagesForC).forEach(entry 
-> {
-                        messagesToRedeliver.remove(entry.getLedgerId(), 
entry.getEntryId());
+                                redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                     });
                 }
 
@@ -563,7 +565,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                         entries.size() - start);
             }
             entries.subList(start, entries.size()).forEach(entry -> {
-                addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
+                long stickyKeyHash = getStickyKeyHash(entry);
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
                 entry.release();
             });
         }
@@ -609,11 +612,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             havePendingReplayRead = false;
             if (exception instanceof 
ManagedLedgerException.InvalidReplayPositionException) {
                 PositionImpl markDeletePosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
-
-                messagesToRedeliver.removeIf((ledgerId, entryId) -> {
-                    return ComparisonChain.start().compare(ledgerId, 
markDeletePosition.getLedgerId())
-                            .compare(entryId, 
markDeletePosition.getEntryId()).result() <= 0;
-                });
+                
redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
             }
         }
 
@@ -679,12 +678,12 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
-        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) 
-> {
-            addMessageToReplay(ledgerId, entryId);
+        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
+            addMessageToReplay(ledgerId, entryId, stickyKeyHash);
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer,
-                    messagesToRedeliver);
+                    redeliveryMessages);
         }
         readMoreEntries();
     }
@@ -692,6 +691,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List<PositionImpl> positions) {
         positions.forEach(position -> {
+            // TODO: We want to pass a sticky key hash as a third argument to 
guarantee the order of the messages
+            // on Key_Shared subscription, but it's difficult to get the 
sticky key here
             if (addMessageToReplay(position.getLedgerId(), 
position.getEntryId())) {
                 redeliveryTracker.addIfAbsent(position);
             }
@@ -793,29 +794,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
-    @Override
-    public boolean trackDelayedDelivery(long ledgerId, long entryId, 
MessageMetadata msgMetadata) {
-        if (!topic.isDelayedDeliveryEnabled()) {
-            // If broker has the feature disabled, always deliver messages 
immediately
-            return false;
-        }
-
-        synchronized (this) {
-            if (!delayedDeliveryTracker.isPresent()) {
-                // Initialize the tracker the first time we need to use it
-                delayedDeliveryTracker = Optional
-                        
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
-            }
-
-            
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
msgMetadata.getDeliverAtTime());
-        }
-    }
-
     protected synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
-        if (!messagesToRedeliver.isEmpty()) {
-            return messagesToRedeliver.items(maxMessagesToRead,
-                    (ledgerId, entryId) -> new PositionImpl(ledgerId, 
entryId));
+        if (!redeliveryMessages.isEmpty()) {
+            return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
             return 
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
@@ -841,20 +822,37 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         }
     }
 
+    protected boolean addMessageToReplay(long ledgerId, long entryId, long 
stickyKeyHash) {
+        if (checkIfMessageIsUnacked(ledgerId, entryId)) {
+            redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     protected boolean addMessageToReplay(long ledgerId, long entryId) {
-        PositionImpl markDeletePosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
-        if (markDeletePosition == null || ledgerId > 
markDeletePosition.getLedgerId()
-                || (ledgerId == markDeletePosition.getLedgerId() && entryId > 
markDeletePosition.getEntryId())) {
-            messagesToRedeliver.add(ledgerId, entryId);
+        if (checkIfMessageIsUnacked(ledgerId, entryId)) {
+            redeliveryMessages.add(ledgerId, entryId);
             return true;
         } else {
             return false;
         }
     }
 
+    private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) {
+        PositionImpl markDeletePosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        return (markDeletePosition == null || ledgerId > 
markDeletePosition.getLedgerId()
+                || (ledgerId == markDeletePosition.getLedgerId() && entryId > 
markDeletePosition.getEntryId()));
+    }
+
     public PersistentTopic getTopic() {
         return topic;
     }
 
+    protected int getStickyKeyHash(Entry entry) {
+        return 
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 45921dd..299f545 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -72,7 +72,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
             Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
-        super(topic, cursor, subscription);
+        super(topic, cursor, subscription, ksm.getAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
@@ -125,7 +125,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         // The consumer must be removed from the selector before calling the 
superclass removeConsumer method.
         // In the superclass removeConsumer method, the pending acks that the 
consumer has are added to
-        // messagesToRedeliver. If the consumer has not been removed from the 
selector at this point,
+        // redeliveryMessages. If the consumer has not been removed from the 
selector at this point,
         // the broker will try to redeliver the messages to the consumer that 
has already been closed.
         // As a result, the messages are not redelivered to any consumer, and 
the mark-delete position does not move,
         // eventually causing all consumers to get stuck.
@@ -136,7 +136,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             if (consumerList.size() == 1) {
                 recentlyJoinedConsumers.clear();
             }
-            if (removeConsumersFromRecentJoinedConsumers() || 
messagesToRedeliver.size() > 0) {
+            if (removeConsumersFromRecentJoinedConsumers() || 
!redeliveryMessages.isEmpty()) {
                 readMoreEntries();
             }
         }
@@ -171,10 +171,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
         groupedEntries.clear();
+        final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new 
HashMap<>();
 
         for (Entry entry : entries) {
-            Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer c = selector.select(stickyKeyHash);
             groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
+            consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new 
HashSet<>()).add(stickyKeyHash);
         }
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
@@ -189,7 +192,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             int entriesWithSameKeyCount = entriesWithSameKey.size();
             final int availablePermits = consumer == null ? 0 : 
Math.max(consumer.getAvailablePermits(), 0);
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, 
availablePermits);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, 
entriesWithSameKey, maxMessagesForC, readType);
+            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, 
entriesWithSameKey, maxMessagesForC,
+                    readType, consumerStickyKeyHashesMap.get(consumer));
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
                         name, consumer.consumerName(), messagesForC, readType);
@@ -200,7 +204,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 // so we discard for now and mark them for later redelivery
                 for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
                     Entry entry = entriesWithSameKey.get(i);
-                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId());
+                    long stickyKeyHash = getStickyKeyHash(entry);
+                    addMessageToReplay(entry.getLedgerId(), 
entry.getEntryId(), stickyKeyHash);
                     entry.release();
                     entriesWithSameKey.set(i, null);
                 }
@@ -211,7 +216,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 if (readType == ReadType.Replay) {
                     for (int i = 0; i < messagesForC; i++) {
                         Entry entry = entriesWithSameKey.get(i);
-                        messagesToRedeliver.remove(entry.getLedgerId(), 
entry.getEntryId());
+                        redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                     }
                 }
 
@@ -279,12 +284,19 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages, ReadType readType) {
+    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages,
+            ReadType readType, Set<Integer> stickyKeyHashes) {
         if (maxMessages == 0) {
             // the consumer was stuck
             nextStuckConsumers.add(consumer);
             return 0;
         }
+        if (readType == ReadType.Normal && stickyKeyHashes != null
+                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
+            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
+            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
+            return 0;
+        }
         if (recentlyJoinedConsumers == null) {
             return maxMessages;
         }
@@ -362,6 +374,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         return hasConsumerRemovedFromTheRecentJoinedConsumers;
     }
 
+    @Override
     protected synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
         if (isDispatcherStuckOnReplays) {
             // If we're stuck on replay, we want to move forward reading on 
the topic (until the overall max-unacked
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c7344d6..43645bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -80,7 +81,6 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -1571,9 +1571,10 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         PersistentSubscription subRef = topicRef.getSubscription(subName);
         PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) subRef
                 .getDispatcher();
-        Field replayMap = 
PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver");
-        replayMap.setAccessible(true);
-        ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 
1);
+        Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        MessageRedeliveryController redeliveryMessages = new 
MessageRedeliveryController(true);
 
         assertNotNull(subRef);
 
@@ -1594,24 +1595,24 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
             }
             if (i < replayIndex) {
                 // (3) accumulate acked messages for replay
-                messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId());
+                redeliveryMessages.add(msgId.getLedgerId(), 
msgId.getEntryId());
             }
         }
 
         // (4) redelivery : should redeliver only unacked messages
         Thread.sleep(1000);
 
-        replayMap.set(dispatcher, messagesToReplay);
+        redeliveryMessagesField.set(dispatcher, redeliveryMessages);
         // (a) redelivery with all acked-message should clear messageReply 
bucket
         
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
         Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
-            return messagesToReplay.isEmpty();
+            return redeliveryMessages.isEmpty();
         });
-        assertEquals(messagesToReplay.size(), 0);
+        assertTrue(redeliveryMessages.isEmpty());
 
         // (b) fill messageReplyBucket with already acked entry again: and try 
to publish new msg and read it
-        messagesToReplay.add(firstAckedMsg.getLedgerId(), 
firstAckedMsg.getEntryId());
-        replayMap.set(dispatcher, messagesToReplay);
+        redeliveryMessages.add(firstAckedMsg.getLedgerId(), 
firstAckedMsg.getEntryId());
+        redeliveryMessagesField.set(dispatcher, redeliveryMessages);
         // send new message
         final String testMsg = "testMsg";
         producer.send(testMsg.getBytes());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
new file mode 100644
index 0000000..9a785f6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MessageRedeliveryControllerTest {
+    @DataProvider(name = "allowOutOfOrderDelivery")
+    public Object[][] dataProvider() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+    public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws 
Exception {
+        MessageRedeliveryController controller = new 
MessageRedeliveryController(allowOutOfOrderDelivery);
+
+        Field messagesToRedeliverField = 
MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
+        messagesToRedeliverField.setAccessible(true);
+        LongPairSet messagesToRedeliver = (LongPairSet) 
messagesToRedeliverField.get(controller);
+
+        Field hashesToBeBlockedField = 
MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
+        hashesToBeBlockedField.setAccessible(true);
+        ConcurrentLongLongPairHashMap hashesToBeBlocked = 
(ConcurrentLongLongPairHashMap) hashesToBeBlockedField
+                .get(controller);
+
+        if (allowOutOfOrderDelivery) {
+            assertNull(hashesToBeBlocked);
+        } else {
+            assertNotNull(hashesToBeBlocked);
+        }
+
+        assertTrue(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 0);
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 0);
+        }
+
+        assertTrue(controller.add(1, 1));
+        assertTrue(controller.add(1, 2));
+        assertFalse(controller.add(1, 1));
+
+        assertFalse(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 2);
+        assertTrue(messagesToRedeliver.contains(1, 1));
+        assertTrue(messagesToRedeliver.contains(1, 2));
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 0);
+            assertFalse(hashesToBeBlocked.containsKey(1, 1));
+            assertFalse(hashesToBeBlocked.containsKey(1, 2));
+        }
+
+        assertTrue(controller.remove(1, 1));
+        assertTrue(controller.remove(1, 2));
+        assertFalse(controller.remove(1, 1));
+
+        assertTrue(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 0);
+        assertFalse(messagesToRedeliver.contains(1, 1));
+        assertFalse(messagesToRedeliver.contains(1, 2));
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 0);
+        }
+
+        assertTrue(controller.add(2, 1, 100));
+        assertTrue(controller.add(2, 2, 101));
+        assertTrue(controller.add(2, 3, 101));
+        assertFalse(controller.add(2, 1, 100));
+
+        assertFalse(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 3);
+        assertTrue(messagesToRedeliver.contains(2, 1));
+        assertTrue(messagesToRedeliver.contains(2, 2));
+        assertTrue(messagesToRedeliver.contains(2, 3));
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 3);
+            assertEquals(hashesToBeBlocked.get(2, 1).first, 100);
+            assertEquals(hashesToBeBlocked.get(2, 2).first, 101);
+            assertEquals(hashesToBeBlocked.get(2, 3).first, 101);
+        }
+
+        controller.clear();
+        assertTrue(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 0);
+        assertTrue(messagesToRedeliver.isEmpty());
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 0);
+            assertTrue(hashesToBeBlocked.isEmpty());
+        }
+
+        controller.add(2, 2, 201);
+        controller.add(1, 3, 100);
+        controller.add(3, 1, 300);
+        controller.add(2, 1, 200);
+        controller.add(3, 2, 301);
+        controller.add(1, 2, 101);
+        controller.add(1, 1, 100);
+
+        controller.removeAllUpTo(1, 3);
+        assertEquals(messagesToRedeliver.size(), 4);
+        assertTrue(messagesToRedeliver.contains(2, 1));
+        assertTrue(messagesToRedeliver.contains(2, 2));
+        assertTrue(messagesToRedeliver.contains(3, 1));
+        assertTrue(messagesToRedeliver.contains(3, 2));
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 4);
+            assertEquals(hashesToBeBlocked.get(2, 1).first, 200);
+            assertEquals(hashesToBeBlocked.get(2, 2).first, 201);
+            assertEquals(hashesToBeBlocked.get(3, 1).first, 300);
+            assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+        }
+
+        controller.removeAllUpTo(3, 1);
+        assertEquals(messagesToRedeliver.size(), 1);
+        assertTrue(messagesToRedeliver.contains(3, 2));
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 1);
+            assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+        }
+
+        controller.removeAllUpTo(5, 10);
+        assertTrue(controller.isEmpty());
+        assertEquals(messagesToRedeliver.size(), 0);
+        if (!allowOutOfOrderDelivery) {
+            assertEquals(hashesToBeBlocked.size(), 0);
+        }
+    }
+
+    @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+    public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery) 
throws Exception {
+        MessageRedeliveryController controller = new 
MessageRedeliveryController(allowOutOfOrderDelivery);
+        controller.add(1, 1, 100);
+        controller.add(1, 2, 101);
+        controller.add(1, 3, 102);
+        controller.add(2, 2, 103);
+        controller.add(2, 1, 104);
+
+        if (allowOutOfOrderDelivery) {
+            
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(100)));
+            
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103)));
+            
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105)));
+        } else {
+            
assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(100)));
+            assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(101, 
102, 103)));
+            assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(104, 
105)));
+        }
+
+        assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet()));
+        assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(99)));
+        assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(105, 
106)));
+    }
+
+    @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+    public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) 
throws Exception {
+        MessageRedeliveryController controller = new 
MessageRedeliveryController(allowOutOfOrderDelivery);
+        controller.add(2, 2);
+        controller.add(1, 3);
+        controller.add(3, 1);
+        controller.add(2, 1);
+        controller.add(3, 2);
+        controller.add(1, 2);
+        controller.add(1, 1);
+
+        if (allowOutOfOrderDelivery) {
+            // The entries are sorted by ledger ID but not by entry ID
+            PositionImpl[] actual1 = 
controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]);
+            PositionImpl[] expected1 = { PositionImpl.get(1, 1), 
PositionImpl.get(1, 2), PositionImpl.get(1, 3) };
+            assertEqualsNoOrder(actual1, expected1);
+        } else {
+            // The entries are completely sorted
+            Set<PositionImpl> actual2 = controller.getMessagesToReplayNow(6);
+            Set<PositionImpl> expected2 = new TreeSet<>();
+            expected2.add(PositionImpl.get(1, 1));
+            expected2.add(PositionImpl.get(1, 2));
+            expected2.add(PositionImpl.get(1, 3));
+            expected2.add(PositionImpl.get(2, 1));
+            expected2.add(PositionImpl.get(2, 2));
+            expected2.add(PositionImpl.get(3, 1));
+            assertEquals(actual2, expected2);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index a902ac2..041d740 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -22,13 +22,20 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.*;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.mockito.ArgumentCaptor;
@@ -43,13 +50,32 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -279,6 +305,123 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
     }
 
+    @Test(timeOut = 30000)
+    public void testMessageRedelivery() throws Exception {
+        final Queue<Position> actualEntriesToConsumer1 = new 
ConcurrentLinkedQueue<>();
+        final Queue<Position> actualEntriesToConsumer2 = new 
ConcurrentLinkedQueue<>();
+
+        final Queue<Position> expectedEntriesToConsumer1 = new 
ConcurrentLinkedQueue<>();
+        expectedEntriesToConsumer1.add(PositionImpl.get(1, 1));
+        final Queue<Position> expectedEntriesToConsumer2 = new 
ConcurrentLinkedQueue<>();
+        expectedEntriesToConsumer2.add(PositionImpl.get(1, 2));
+        expectedEntriesToConsumer2.add(PositionImpl.get(1, 3));
+
+        final AtomicInteger remainingEntriesNum = new AtomicInteger(
+                expectedEntriesToConsumer1.size() + 
expectedEntriesToConsumer2.size());
+
+        // Messages with key1 are routed to consumer1 and messages with key2 
are routed to consumer2
+        final List<Entry> allEntries = new ArrayList<>();
+        allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, 
"key2")));
+        allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
+        allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key1")));
+        allEntries.forEach(entry -> ((EntryImpl) entry).retain());
+
+        final List<Entry> redeliverEntries = new ArrayList<>();
+        redeliverEntries.add(allEntries.get(0)); // message1
+        final List<Entry> readEntries = new ArrayList<>();
+        readEntries.add(allEntries.get(2)); // message3
+
+        final Consumer consumer1 = mock(Consumer.class);
+        doReturn("consumer1").when(consumer1).consumerName();
+        // Change availablePermits of consumer1 to 0 and then back to normal
+        when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10);
+        doReturn(true).when(consumer1).isWritable();
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            List<Entry> entries = (List<Entry>) 
invocationOnMock.getArgument(0);
+            for (Entry entry : entries) {
+                remainingEntriesNum.decrementAndGet();
+                actualEntriesToConsumer1.add(entry.getPosition());
+            }
+            return channelMock;
+        }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), 
any(EntryBatchIndexesAcks.class),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+
+        final Consumer consumer2 = mock(Consumer.class);
+        doReturn("consumer2").when(consumer2).consumerName();
+        when(consumer2.getAvailablePermits()).thenReturn(10);
+        doReturn(true).when(consumer2).isWritable();
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            List<Entry> entries = (List<Entry>) 
invocationOnMock.getArgument(0);
+            for (Entry entry : entries) {
+                remainingEntriesNum.decrementAndGet();
+                actualEntriesToConsumer2.add(entry.getPosition());
+            }
+            return channelMock;
+        }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class), 
any(EntryBatchIndexesAcks.class),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+
+        persistentDispatcher.addConsumer(consumer1);
+        persistentDispatcher.addConsumer(consumer2);
+
+        final Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("totalAvailablePermits");
+        totalAvailablePermitsField.setAccessible(true);
+        totalAvailablePermitsField.set(persistentDispatcher, 1000);
+
+        final Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        MessageRedeliveryController redeliveryMessages = 
(MessageRedeliveryController) redeliveryMessagesField
+                .get(persistentDispatcher);
+        redeliveryMessages.add(allEntries.get(0).getLedgerId(), 
allEntries.get(0).getEntryId(),
+                getStickyKeyHash(allEntries.get(0))); // message1
+        redeliveryMessages.add(allEntries.get(1).getLedgerId(), 
allEntries.get(1).getEntryId(),
+                getStickyKeyHash(allEntries.get(1))); // message2
+
+        // Mock Cursor#asyncReplayEntries
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            Set<Position> positions = (Set<Position>) 
invocationOnMock.getArgument(0);
+            List<Entry> entries = allEntries.stream().filter(entry -> 
positions.contains(entry.getPosition()))
+                    .collect(Collectors.toList());
+            if (!entries.isEmpty()) {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(1))
+                        .readEntriesComplete(entries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay);
+            }
+            return Collections.emptySet();
+        }).when(cursorMock).asyncReplayEntries(anySet(), 
any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+                
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), 
anyBoolean());
+
+        // Mock Cursor#asyncReadEntriesOrWait
+        doAnswer(invocationOnMock -> {
+            ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(2))
+                    .readEntriesComplete(readEntries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            return null;
+        }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
+                any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+                
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
+
+        // (1) Run sendMessagesToConsumers
+        // (2) Attempts to send message1 to consumer1 but skipped because 
availablePermits is 0
+        // (3) Change availablePermits of consumer1 to 10
+        // (4) Run readMoreEntries internally
+        // (5) Run sendMessagesToConsumers internally
+        // (6) Attempts to send message3 to consumer2 but skipped because 
redeliveryMessages contains message2
+        
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay,
+                redeliverEntries);
+        while (remainingEntriesNum.get() > 0) {
+            // (7) Run readMoreEntries and resend message1 to consumer1 and 
message2-3 to consumer2
+            persistentDispatcher.readMoreEntries();
+        }
+
+        assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1);
+        assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2);
+
+        allEntries.forEach(entry -> entry.release());
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }
@@ -292,4 +435,9 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         messageMetadata.setPublishTime(System.currentTimeMillis());
         return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, 
messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
     }
+
+    private int getStickyKeyHash(Entry entry) {
+        byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), 
topicName, subscriptionName);
+        return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 1a0dac5..1cd0385 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -1529,6 +1530,23 @@ public class Commands {
         return res;
     }
 
+    private static final byte[] NONE_KEY = 
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String 
topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            if (metadata.hasOrderingKey()) {
+                return metadata.getOrderingKey().toByteArray();
+            } else if (metadata.hasPartitionKey()) {
+                return 
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+            }
+        } catch (Throwable t) {
+            log.error("[{}] [{}] Failed to peek sticky key from the message 
metadata", topic, subscription, t);
+        }
+        return Commands.NONE_KEY;
+    }
+
     // ---- transaction related ----
 
     public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {

Reply via email to