This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d7e8ea16e66 [fix][broker] Introduce the last sent position to fix 
message ordering issues in Key_Shared (PIP-282) (#21953)
d7e8ea16e66 is described below

commit d7e8ea16e6682df9a9354cda25cf4f1f9cb54429
Author: Yuri Mizushima <[email protected]>
AuthorDate: Fri Jul 19 12:37:41 2024 +0900

    [fix][broker] Introduce the last sent position to fix message ordering 
issues in Key_Shared (PIP-282) (#21953)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  13 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   2 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  10 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 195 ++++++++--
 .../service/persistent/PersistentSubscription.java |  19 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 196 +++++++++-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 357 ++++++++++++++++++
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 399 ++++++++++++++++++++-
 .../pulsar/common/policies/data/ConsumerStats.java |   4 +-
 .../common/policies/data/SubscriptionStats.java    |   6 +
 .../policies/data/stats/ConsumerStatsImpl.java     |  10 +-
 .../policies/data/stats/SubscriptionStatsImpl.java |   6 +
 13 files changed, 1158 insertions(+), 61 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4ef9678f3e1..f99ee957e02 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3472,6 +3472,19 @@ public class ManagedCursorImpl implements ManagedCursor {
         return individualDeletedMessages;
     }
 
+    public Position 
processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
+            LongPairRangeSet.RangeProcessor<Position> processor) {
+        final Position mdp;
+        lock.readLock().lock();
+        try {
+            mdp = markDeletePosition;
+            individualDeletedMessages.forEach(processor);
+        } finally {
+            lock.readLock().unlock();
+        }
+        return mdp;
+    }
+
     public boolean isMessageDeleted(Position position) {
         lock.readLock().lock();
         try {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b7734906f75..209bf57b24f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3497,7 +3497,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      *            the position range
      * @return the count of entries
      */
-    long getNumberOfEntries(Range<Position> range) {
+    public long getNumberOfEntries(Range<Position> range) {
         Position fromPosition = range.lowerEndpoint();
         boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
         Position toPosition = range.upperEndpoint();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 02e21c44c91..dca64395d86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -145,7 +145,7 @@ public class Consumer {
 
     private static final double avgPercent = 0.9;
     private boolean preciseDispatcherFlowControl;
-    private Position readPositionWhenJoining;
+    private Position lastSentPositionWhenJoining;
     private final String clientAddress; // IP address only, no port number 
included
     private final MessageId startMessageId;
     private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
@@ -931,8 +931,8 @@ public class Consumer {
         stats.unackedMessages = unackedMessages;
         stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
         stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
-        if (readPositionWhenJoining != null) {
-            stats.readPositionWhenJoining = readPositionWhenJoining.toString();
+        if (lastSentPositionWhenJoining != null) {
+            stats.lastSentPositionWhenJoining = 
lastSentPositionWhenJoining.toString();
         }
         return stats;
     }
@@ -1166,8 +1166,8 @@ public class Consumer {
         return preciseDispatcherFlowControl;
     }
 
-    public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
-        this.readPositionWhenJoining = readPositionWhenJoining;
+    public void setLastSentPositionWhenJoining(Position 
lastSentPositionWhenJoining) {
+        this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
     }
 
     public int getMaxUnackedMessages() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 766f45ad990..91cec1f8e90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -34,9 +34,12 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -55,6 +58,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.util.FutureUtil;
+import 
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,12 +78,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
      */
     private final LinkedHashMap<Consumer, Position> recentlyJoinedConsumers;
 
+    /**
+     * The lastSentPosition and the individuallySentPositions are not thread 
safe.
+     */
+    @Nullable
+    private Position lastSentPosition;
+    private final LongPairRangeSet<Position> individuallySentPositions;
+    private static final LongPairRangeSet.LongPairConsumer<Position> 
positionRangeConverter = PositionFactory::create;
+
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
             Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
+        this.individuallySentPositions =
+                allowOutOfOrderDelivery ? null : new 
ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
         this.keySharedMode = ksm.getKeySharedMode();
         switch (this.keySharedMode) {
         case AUTO_SPLIT:
@@ -124,15 +139,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 })
         ).thenRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                Position readPositionWhenJoining = cursor.getReadPosition();
-                consumer.setReadPositionWhenJoining(readPositionWhenJoining);
-                // If this was the 1st consumer, or if all the messages are 
already acked, then we
-                // don't need to do anything special
-                if (!allowOutOfOrderDelivery
-                        && recentlyJoinedConsumers != null
-                        && consumerList.size() > 1
-                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-                    recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+                if (!allowOutOfOrderDelivery) {
+                    final Position lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
+                    if (lastSentPositionWhenJoining != null) {
+                        
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+                        // If this was the 1st consumer, or if all the 
messages are already acked, then we
+                        // don't need to do anything special
+                        if (recentlyJoinedConsumers != null
+                                && consumerList.size() > 1
+                                && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+                            recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);
+                        }
+                    }
                 }
             }
         });
@@ -148,10 +166,16 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // eventually causing all consumers to get stuck.
         selector.removeConsumer(consumer);
         super.removeConsumer(consumer);
-        if (recentlyJoinedConsumers != null) {
+        if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null) {
             recentlyJoinedConsumers.remove(consumer);
             if (consumerList.size() == 1) {
                 recentlyJoinedConsumers.clear();
+            } else if (consumerList.isEmpty()) {
+                // The subscription removes consumers if rewind or reset 
cursor operations are called.
+                // The dispatcher must clear lastSentPosition and 
individuallySentPositions because
+                // these operations trigger re-sending messages.
+                lastSentPosition = null;
+                individuallySentPositions.clear();
             }
             if (removeConsumersFromRecentJoinedConsumers() || 
!redeliveryMessages.isEmpty()) {
                 readMoreEntries();
@@ -193,9 +217,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             return false;
         }
 
-        // A corner case that we have to retry a readMoreEntries in order to 
preserver order delivery.
-        // This may happen when consumer closed. See issue #12885 for details.
         if (!allowOutOfOrderDelivery) {
+            // A corner case that we have to retry a readMoreEntries in order 
to preserver order delivery.
+            // This may happen when consumer closed. See issue #12885 for 
details.
             NavigableSet<Position> messagesToReplayNow = 
this.getMessagesToReplayNow(1);
             if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) 
{
                 Position replayPosition = messagesToReplayNow.first();
@@ -229,6 +253,24 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     }
                 }
             }
+
+            // Update if the markDeletePosition move forward
+            updateIfNeededAndGetLastSentPosition();
+
+            // Should not access to individualDeletedMessages from outside 
managed cursor
+            // because it doesn't guarantee thread safety.
+            if (lastSentPosition == null) {
+                if (cursor.getMarkDeletedPosition() != null) {
+                    lastSentPosition = ((ManagedCursorImpl) cursor)
+                            
.processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(range -> {
+                                final Position lower = range.lowerEndpoint();
+                                final Position upper = range.upperEndpoint();
+                                
individuallySentPositions.addOpenClosed(lower.getLedgerId(), lower.getEntryId(),
+                                        upper.getLedgerId(), 
upper.getEntryId());
+                                return true;
+                            });
+                }
+            }
         }
 
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
@@ -280,12 +322,24 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
 
             if (messagesForC > 0) {
-                // remove positions first from replay list first : 
sendMessages recycles entries
-                if (readType == ReadType.Replay) {
-                    for (int i = 0; i < messagesForC; i++) {
-                        Entry entry = entriesWithSameKey.get(i);
+                final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+                for (int i = 0; i < messagesForC; i++) {
+                    final Entry entry = entriesWithSameKey.get(i);
+                    // remove positions first from replay list first : 
sendMessages recycles entries
+                    if (readType == ReadType.Replay) {
                         redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                     }
+                    // Add positions to individuallySentPositions if necessary
+                    if (!allowOutOfOrderDelivery) {
+                        final Position position = entry.getPosition();
+                        // Store to individuallySentPositions even if 
lastSentPosition is null
+                        if ((lastSentPosition == null || 
position.compareTo(lastSentPosition) > 0)
+                                && 
!individuallySentPositions.contains(position.getLedgerId(), 
position.getEntryId())) {
+                            final Position previousPosition = 
managedLedger.getPreviousPosition(position);
+                            
individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(),
+                                    previousPosition.getEntryId(), 
position.getLedgerId(), position.getEntryId());
+                        }
+                    }
                 }
 
                 SendMessageInfo sendMessageInfo = 
SendMessageInfo.getThreadLocal();
@@ -311,6 +365,61 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
         }
 
+        // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
+        if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+            final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            com.google.common.collect.Range<Position> range = 
individuallySentPositions.firstRange();
+
+            // If the upper bound is before the last sent position, we need to 
move ahead as these
+            // individuallySentPositions are now irrelevant.
+            if (range != null && 
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+                
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+                        lastSentPosition.getEntryId());
+                range = individuallySentPositions.firstRange();
+            }
+
+            if (range != null) {
+                // If the lowerBound is ahead of the last sent position,
+                // verify if there are any entries in-between.
+                if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 || 
managedLedger
+                        
.getNumberOfEntries(com.google.common.collect.Range.openClosed(lastSentPosition,
+                                range.lowerEndpoint())) <= 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Found a position range to last sent: 
{}", name, range);
+                    }
+                    Position newLastSentPosition = range.upperEndpoint();
+                    Position positionAfterNewLastSent = managedLedger
+                            .getNextValidPosition(newLastSentPosition);
+                    // sometime ranges are connected but belongs to different 
ledgers
+                    // so, they are placed sequentially
+                    // eg: (2:10..3:15] can be returned as 
(2:10..2:15],[3:0..3:15].
+                    // So, try to iterate over connected range and found the 
last non-connected range
+                    // which gives new last sent position.
+                    final Position lastConfirmedEntrySnapshot = 
managedLedger.getLastConfirmedEntry();
+                    if (lastConfirmedEntrySnapshot != null) {
+                        while 
(positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0) {
+                            if 
(individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(),
+                                    positionAfterNewLastSent.getEntryId())) {
+                                range = 
individuallySentPositions.rangeContaining(
+                                        
positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId());
+                                newLastSentPosition = range.upperEndpoint();
+                                positionAfterNewLastSent = 
managedLedger.getNextValidPosition(newLastSentPosition);
+                                // check if next valid position is also 
deleted and part of the deleted-range
+                                continue;
+                            }
+                            break;
+                        }
+                    }
+
+                    if (lastSentPosition.compareTo(newLastSentPosition) < 0) {
+                        lastSentPosition = newLastSentPosition;
+                    }
+                    
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+                            lastSentPosition.getEntryId());
+                }
+            }
+        }
+
         // acquire message-dispatch permits for already delivered messages
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
@@ -351,10 +460,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return maxMessages;
         }
         removeConsumersFromRecentJoinedConsumers();
-        Position maxReadPosition = recentlyJoinedConsumers.get(consumer);
+        Position maxLastSentPosition = recentlyJoinedConsumers.get(consumer);
         // At this point, all the old messages were already consumed and this 
consumer
         // is now ready to receive any message
-        if (maxReadPosition == null) {
+        if (maxLastSentPosition == null) {
             // The consumer has not recently joined, so we can send all 
messages
             return maxMessages;
         }
@@ -373,16 +482,16 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // But the message [2,3] should not dispatch to consumer2.
 
         if (readType == ReadType.Replay) {
-            Position minReadPositionForRecentJoinedConsumer = 
recentlyJoinedConsumers.values().iterator().next();
-            if (minReadPositionForRecentJoinedConsumer != null
-                    && 
minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
-                maxReadPosition = minReadPositionForRecentJoinedConsumer;
+            Position minLastSentPositionForRecentJoinedConsumer = 
recentlyJoinedConsumers.values().iterator().next();
+            if (minLastSentPositionForRecentJoinedConsumer != null
+                    && 
minLastSentPositionForRecentJoinedConsumer.compareTo(maxLastSentPosition) < 0) {
+                maxLastSentPosition = 
minLastSentPositionForRecentJoinedConsumer;
             }
         }
         // Here, the consumer is one that has recently joined, so we can only 
send messages that were
         // published before it has joined.
         for (int i = 0; i < maxMessages; i++) {
-            if ((entries.get(i)).compareTo(maxReadPosition) >= 0) {
+            if ((entries.get(i)).compareTo(maxLastSentPosition) > 0) {
                 // We have already crossed the divider line. All messages in 
the list are now
                 // newer than what we can currently dispatch to this consumer
                 return i;
@@ -416,11 +525,9 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
         Position mdp = cursor.getMarkDeletedPosition();
         if (mdp != null) {
-            Position nextPositionOfTheMarkDeletePosition =
-                    ((ManagedLedgerImpl) 
cursor.getManagedLedger()).getNextValidPosition(mdp);
             while (itr.hasNext()) {
                 Map.Entry<Consumer, Position> entry = itr.next();
-                if 
(entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
+                if (entry.getValue().compareTo(mdp) <= 0) {
                     itr.remove();
                     hasConsumerRemovedFromTheRecentJoinedConsumers = true;
                 } else {
@@ -431,6 +538,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return hasConsumerRemovedFromTheRecentJoinedConsumers;
     }
 
+    @Nullable
+    private synchronized Position updateIfNeededAndGetLastSentPosition() {
+        if (lastSentPosition == null) {
+            return null;
+        }
+        final Position mdp = cursor.getMarkDeletedPosition();
+        if (mdp != null && mdp.compareTo(lastSentPosition) > 0) {
+            lastSentPosition = mdp;
+        }
+        return lastSentPosition;
+    }
+
     @Override
     protected synchronized NavigableSet<Position> getMessagesToReplayNow(int 
maxMessagesToRead) {
         if (isDispatcherStuckOnReplays) {
@@ -551,6 +670,30 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return recentlyJoinedConsumers;
     }
 
+    public synchronized String getLastSentPosition() {
+        if (lastSentPosition == null) {
+            return null;
+        }
+        return lastSentPosition.toString();
+    }
+
+    @VisibleForTesting
+    public Position getLastSentPositionField() {
+        return lastSentPosition;
+    }
+
+    public synchronized String getIndividuallySentPositions() {
+        if (individuallySentPositions == null) {
+            return null;
+        }
+        return individuallySentPositions.toString();
+    }
+
+    @VisibleForTesting
+    public LongPairRangeSet<Position> getIndividuallySentPositionsField() {
+        return individuallySentPositions;
+    }
+
     public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
         return selector.getConsumerKeyHashRanges();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a1d51668ca8..77aa5f82c39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1305,9 +1305,26 @@ public class PersistentSubscription extends 
AbstractSubscription {
                     .getRecentlyJoinedConsumers();
             if (recentlyJoinedConsumers != null && 
recentlyJoinedConsumers.size() > 0) {
                 recentlyJoinedConsumers.forEach((k, v) -> {
-                    
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
+                    // The dispatcher allows same name consumers
+                    final StringBuilder stringBuilder = new StringBuilder();
+                    
stringBuilder.append("consumerName=").append(k.consumerName())
+                            .append(", consumerId=").append(k.consumerId());
+                    if (k.cnx() != null) {
+                        stringBuilder.append(", 
address=").append(k.cnx().clientAddress());
+                    }
+                    
subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), 
v.toString());
                 });
             }
+            final String lastSentPosition = 
((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+                    .getLastSentPosition();
+            if (lastSentPosition != null) {
+                subStats.lastSentPosition = lastSentPosition;
+            }
+            final String individuallySentPositions = 
((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+                    .getIndividuallySentPositions();
+            if (individuallySentPositions != null) {
+                subStats.individuallySentPositions = individuallySentPositions;
+            }
         }
         subStats.nonContiguousDeletedMessagesRanges = 
cursor.getTotalNonContiguousDeletedMessagesRange();
         subStats.nonContiguousDeletedMessagesRangesSerializedSize =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 1c83941d6e7..5432b8a430d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -56,6 +58,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response.Status;
@@ -65,6 +68,7 @@ import lombok.SneakyThrows;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -75,6 +79,8 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.testcontext.SpyConfig;
 import org.apache.pulsar.client.admin.GetStatsOptions;
@@ -139,7 +145,10 @@ import 
org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicHashPositions;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import 
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.awaitility.Awaitility;
@@ -3449,8 +3458,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testGetReadPositionWhenJoining() throws Exception {
-        final String topic = 
"persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + 
UUID.randomUUID().toString();
+    public void testGetLastSentPositionWhenJoining() throws Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + 
UUID.randomUUID().toString();
         final String subName = "my-sub";
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -3458,34 +3467,189 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 .enableBatching(false)
                 .create();
 
+        @Cleanup
+        final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subName)
+                .subscribe();
+
         final int messages = 10;
         MessageIdImpl messageId = null;
         for (int i = 0; i < messages; i++) {
             messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + 
i).getBytes());
+            consumer1.receive();
         }
 
-        List<Consumer<byte[]>> consumers = new ArrayList<>();
-        for (int i = 0; i < 2; i++) {
-            Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                    .topic(topic)
-                    .subscriptionType(SubscriptionType.Key_Shared)
-                    .subscriptionName(subName)
-                    .subscribe();
-            consumers.add(consumer);
-        }
+        @Cleanup
+        final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subName)
+                .subscribe();
 
         TopicStats stats = admin.topics().getStats(topic);
         Assert.assertEquals(stats.getSubscriptions().size(), 1);
         SubscriptionStats subStats = stats.getSubscriptions().get(subName);
         Assert.assertNotNull(subStats);
         Assert.assertEquals(subStats.getConsumers().size(), 2);
-        ConsumerStats consumerStats = subStats.getConsumers().get(0);
-        Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
-                PositionFactory.create(messageId.getLedgerId(), 
messageId.getEntryId() + 1).toString());
+        ConsumerStats consumerStats = subStats.getConsumers().stream()
+                .filter(s -> 
s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
+        Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
+                PositionFactory.create(messageId.getLedgerId(), 
messageId.getEntryId()).toString());
+    }
+
+    @Test
+    public void testGetLastSentPosition() throws Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/testGetLastSentPosition-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        @Cleanup
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        final AtomicInteger counter = new AtomicInteger();
+        @Cleanup
+        final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subName)
+                .messageListener((c, msg) -> {
+                    try {
+                        c.acknowledge(msg);
+                        counter.getAndIncrement();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                })
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topic);
+        Assert.assertEquals(stats.getSubscriptions().size(), 1);
+        SubscriptionStats subStats = stats.getSubscriptions().get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertNull(subStats.getLastSentPosition());
 
-        for (Consumer<byte[]> consumer : consumers) {
-            consumer.close();
+        final int messages = 10;
+        MessageIdImpl messageId = null;
+        for (int i = 0; i < messages; i++) {
+            messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + 
i).getBytes());
         }
+
+        Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), 
messages));
+
+        stats = admin.topics().getStats(topic);
+        Assert.assertEquals(stats.getSubscriptions().size(), 1);
+        subStats = stats.getSubscriptions().get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.getLastSentPosition(), 
PositionFactory.create(messageId.getLedgerId(), 
messageId.getEntryId()).toString());
+    }
+
+    @Test
+    public void testGetIndividuallySentPositions() throws Exception {
+        // The producer sends messages with two types of keys.
+        // The dispatcher sends keyA messages to consumer1.
+        // Consumer1 will not receive any messages. Its receiver queue size is 
1.
+        // Consumer2 will receive and ack any messages immediately.
+
+        final String topic = 
"persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        @Cleanup
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        final String consumer1Name = "c1";
+        final String consumer2Name = "c2";
+
+        @Cleanup
+        final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .consumerName(consumer1Name)
+                .receiverQueueSize(1)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subName)
+                .subscribe();
+
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
+        final String keyA = "key-a";
+        final String keyB = "key-b";
+        final int hashA = 
Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
+
+        final Field selectorField = 
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+        selectorField.setAccessible(true);
+        final StickyKeyConsumerSelector selector = 
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+        selectorField.set(dispatcher, selector);
+
+        // the selector returns consumer1 if keyA
+        doAnswer((invocationOnMock -> {
+            final int hash = invocationOnMock.getArgument(0);
+
+            final String consumerName = hash == hashA ? consumer1Name : 
consumer2Name;
+            return dispatcher.getConsumers().stream().filter(consumer -> 
consumer.consumerName().equals(consumerName)).findFirst().get();
+        })).when(selector).select(anyInt());
+
+        final AtomicInteger consumer2AckCounter = new AtomicInteger();
+        @Cleanup
+        final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .consumerName(consumer2Name)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subName)
+                .messageListener((c, msg) -> {
+                    try {
+                        c.acknowledge(msg);
+                        consumer2AckCounter.getAndIncrement();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                })
+                .subscribe();
+
+        final LongPairRangeSet.LongPairConsumer<Position> 
positionRangeConverter = PositionFactory::create;
+        final LongPairRangeSet<Position> expectedIndividuallySentPositions = 
new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
+
+        TopicStats stats = admin.topics().getStats(topic);
+        Assert.assertEquals(stats.getSubscriptions().size(), 1);
+        SubscriptionStats subStats = stats.getSubscriptions().get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.getIndividuallySentPositions(), 
expectedIndividuallySentPositions.toString());
+
+        final Function<String, MessageIdImpl> sendFn = (key) -> {
+            try {
+                return (MessageIdImpl) 
producer.newMessage().key(key).value(("msg").getBytes()).send();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        };
+        final List<MessageIdImpl> messageIdList = new ArrayList<>();
+
+        // the dispatcher can send keyA message, but then consumer1's receiver 
queue will be full
+        messageIdList.add(sendFn.apply(keyA));
+
+        // the dispatcher can send messages other than keyA
+        messageIdList.add(sendFn.apply(keyA));
+        messageIdList.add(sendFn.apply(keyB));
+        messageIdList.add(sendFn.apply(keyA));
+        messageIdList.add(sendFn.apply(keyB));
+        messageIdList.add(sendFn.apply(keyB));
+
+        assertEquals(messageIdList.size(), 6);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(consumer2AckCounter.get(), 3));
+
+        // set expected value
+        
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(),
 messageIdList.get(1).getEntryId(),
+                messageIdList.get(2).getLedgerId(), 
messageIdList.get(2).getEntryId());
+        
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(),
 messageIdList.get(3).getEntryId(),
+                messageIdList.get(5).getLedgerId(), 
messageIdList.get(5).getEntryId());
+
+        stats = admin.topics().getStats(topic);
+        Assert.assertEquals(stats.getSubscriptions().size(), 1);
+        subStats = stats.getSubscriptions().get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.getIndividuallySentPositions(), 
expectedIndividuallySentPositions.toString());
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index a70b3ce7a42..1a205d0f686 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,14 +35,19 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
@@ -50,12 +55,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -72,11 +79,14 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import 
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -84,6 +94,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
     private PulsarService pulsarMock;
     private BrokerService brokerMock;
+    private ManagedLedgerImpl ledgerMock;
     private ManagedCursorImpl cursorMock;
     private Consumer consumerMock;
     private PersistentTopic topicMock;
@@ -135,9 +146,44 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         doReturn(topicName).when(topicMock).getName();
         doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
 
+        ledgerMock = mock(ManagedLedgerImpl.class);
+        doAnswer((invocationOnMock -> {
+            final Position position = invocationOnMock.getArgument(0);
+            if (position.getEntryId() > 0) {
+                return PositionFactory.create(position.getLedgerId(), 
position.getEntryId() - 1);
+            } else {
+                fail("Undefined behavior on mock");
+                return PositionFactory.EARLIEST;
+            }
+        })).when(ledgerMock).getPreviousPosition(any(Position.class));
+        doAnswer((invocationOnMock -> {
+            final Position position = invocationOnMock.getArgument(0);
+            return PositionFactory.create(position.getLedgerId(), 
position.getEntryId() < 0 ? 0 : position.getEntryId() + 1);
+        })).when(ledgerMock).getNextValidPosition(any(Position.class));
+        doAnswer((invocationOnMock -> {
+            final Range<Position> range = invocationOnMock.getArgument(0);
+            Position fromPosition = range.lowerEndpoint();
+            boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
+            Position toPosition = range.upperEndpoint();
+            boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+
+            long count = 0;
+
+            if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
+                // If the 2 positions are in the same ledger
+                count = toPosition.getEntryId() - fromPosition.getEntryId() - 
1;
+                count += fromIncluded ? 1 : 0;
+                count += toIncluded ? 1 : 0;
+            } else {
+                fail("Undefined behavior on mock");
+            }
+            return count;
+        })).when(ledgerMock).getNumberOfEntries(any());
+
         cursorMock = mock(ManagedCursorImpl.class);
         doReturn(null).when(cursorMock).getLastIndividualDeletedRange();
         doReturn(subscriptionName).when(cursorMock).getName();
+        doReturn(ledgerMock).when(cursorMock).getManagedLedger();
 
         consumerMock = mock(Consumer.class);
         channelMock = mock(ChannelPromise.class);
@@ -465,6 +511,317 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         allEntries.forEach(entry -> entry.release());
     }
 
+
+
+    @DataProvider(name = "initializeLastSentPosition")
+    private Object[][] initialLastSentPositionProvider() {
+        return new Object[][] { { false }, { true } };
+    }
+
+    @Test(dataProvider = "initializeLastSentPosition")
+    public void testLastSentPositionAndIndividuallySentPositions(final boolean 
initializeLastSentPosition) throws Exception {
+        final Position initialLastSentPosition = PositionFactory.create(1, 10);
+        final LongPairRangeSet<Position> expectedIndividuallySentPositions
+                = new ConcurrentOpenLongPairRangeSet<>(4096, 
PositionFactory::create);
+
+        final Field lastSentPositionField = 
PersistentStickyKeyDispatcherMultipleConsumers.class
+                .getDeclaredField("lastSentPosition");
+        lastSentPositionField.setAccessible(true);
+        final LongPairRangeSet<Position> individuallySentPositions = 
persistentDispatcher.getIndividuallySentPositionsField();
+        final Supplier<Throwable> clearPosition = () -> {
+            try {
+                lastSentPositionField.set(persistentDispatcher, 
initializeLastSentPosition ? initialLastSentPosition : null);
+                individuallySentPositions.clear();
+                expectedIndividuallySentPositions.clear();
+            } catch (Throwable e) {
+                return e;
+            }
+            return null;
+        };
+        if (!initializeLastSentPosition) {
+            
doReturn(initialLastSentPosition).when(cursorMock).getMarkDeletedPosition();
+            doAnswer(invocationOnMock -> {
+                // skip copy operation
+                return initialLastSentPosition;
+            
}).when(cursorMock).processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(any());
+        }
+
+        // Assume the range sequence is [1:0, 1:19], [2:0, 2:19], ..., [10:0, 
10:19]
+        doAnswer((invocationOnMock -> {
+            final Position position = invocationOnMock.getArgument(0);
+            if (position.getEntryId() > 0) {
+                return PositionFactory.create(position.getLedgerId(), 
position.getEntryId() - 1);
+            } else if (position.getLedgerId() > 0) {
+                return PositionFactory.create(position.getLedgerId() - 1, 19);
+            } else {
+                throw new NullPointerException();
+            }
+        })).when(ledgerMock).getPreviousPosition(any(Position.class));
+        doAnswer((invocationOnMock -> {
+            final Position position = invocationOnMock.getArgument(0);
+            if (position.getEntryId() < 19) {
+                return PositionFactory.create(position.getLedgerId(), 
position.getEntryId() + 1);
+            } else {
+                return PositionFactory.create(position.getLedgerId() + 1, 0);
+            }
+        })).when(ledgerMock).getNextValidPosition(any(Position.class));
+        doReturn(PositionFactory.create(10, 
19)).when(ledgerMock).getLastConfirmedEntry();
+        doAnswer((invocationOnMock -> {
+            final Range<Position> range = invocationOnMock.getArgument(0);
+            Position fromPosition = range.lowerEndpoint();
+            boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
+            Position toPosition = range.upperEndpoint();
+            boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+
+            if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
+                // If the 2 positions are in the same ledger
+                long count = toPosition.getEntryId() - 
fromPosition.getEntryId() - 1;
+                count += fromIncluded ? 1 : 0;
+                count += toIncluded ? 1 : 0;
+                return count;
+            } else {
+                long count = 0;
+                // If the from & to are pointing to different ledgers, then we 
need to :
+                // 1. Add the entries in the ledger pointed by toPosition
+                count += toPosition.getEntryId();
+                count += toIncluded ? 1 : 0;
+
+                // 2. Add the entries in the ledger pointed by fromPosition
+                count += 20 - (fromPosition.getEntryId() + 1);
+                count += fromIncluded ? 1 : 0;
+
+                // 3. Add the whole ledgers entries in between
+                for (long i = fromPosition.getLedgerId() + 1; i < 
toPosition.getLedgerId(); i++) {
+                    count += 20;
+                }
+
+                return count;
+            }
+        })).when(ledgerMock).getNumberOfEntries(any());
+        assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1, 
0)), PositionFactory.create(1, 1));
+        assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1, 
19)), PositionFactory.create(2, 0));
+        assertEquals(ledgerMock.getPreviousPosition(PositionFactory.create(2, 
0)), PositionFactory.create(1, 19));
+        assertThrows(NullPointerException.class, () -> 
ledgerMock.getPreviousPosition(PositionFactory.create(0, 0)));
+        assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+                PositionFactory.create(1, 0), PositionFactory.create(1, 0))), 
0);
+        assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+                PositionFactory.create(1, -1), PositionFactory.create(1, 9))), 
10);
+        assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+                PositionFactory.create(1, 19), PositionFactory.create(2, 
-1))), 0);
+        assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+                PositionFactory.create(1, 19), PositionFactory.create(2, 9))), 
10);
+        assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+                PositionFactory.create(1, -1), PositionFactory.create(3, 
19))), 60);
+
+        // Add a consumer
+        final Consumer consumer1 = mock(Consumer.class);
+        doReturn("consumer1").when(consumer1).consumerName();
+        when(consumer1.getAvailablePermits()).thenReturn(1000);
+        doReturn(true).when(consumer1).isWritable();
+        doReturn(channelMock).when(consumer1).sendMessages(anyList(), 
any(EntryBatchSizes.class),
+                any(EntryBatchIndexesAcks.class), anyInt(), anyLong(), 
anyLong(), any(RedeliveryTracker.class));
+        persistentDispatcher.addConsumer(consumer1);
+
+        /*
+         On single ledger
+         */
+
+        // Expected individuallySentPositions (isp): [(1:-1, 1:8]] (init) -> 
[(1:-1, 1:9]] (update) -> [] (remove)
+        // Expected lastSentPosition (lsp): 1:10 (init) -> 1:10 (remove)
+        // upper bound and the new entry are less than initial last sent 
position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, -1, 1, 8);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 9, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:-1, 1:9]] -> [(1:-1, 1:10]] -> []
+        // lsp: 1:10 -> 1:10
+        // upper bound is less than initial last sent position
+        // upper bound and the new entry are less than or equal to initial 
last sent position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, -1, 1, 9);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 10, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:-1, 1:2], (1:3, 1:4], (1:5, 1:6]] -> [(1:-1, 1:2], (1:3, 
1:4], (1:5, 1:6], (1:9, 1:10]] -> []
+        // lsp: 1:10 -> 1:10
+        // upper bound and the new entry are less than or equal to initial 
last sent position
+        // individually sent positions has multiple ranges
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, -1, 1, 2);
+        individuallySentPositions.addOpenClosed(1, 3, 1, 4);
+        individuallySentPositions.addOpenClosed(1, 5, 1, 6);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 10, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:-1, 1:10]] -> [(1:-1, 1:11]] -> []
+        // lsp: 1:10 -> 1:11
+        // upper bound is less than or equal to initial last sent position
+        // the new entry is next position of initial last sent position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, -1, 1, 10);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(1, 11).toString());
+
+        // isp: [(1:-1, 1:9]] -> [(1:-1, 1:9], (1:10, 1:11]] -> []
+        // lsp: 1:10 -> 1:11
+        // upper bound is less than initial last sent position
+        // the new entry is next position of initial last sent position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, -1, 1, 9);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(1, 11).toString());
+
+        // isp: [(1:11, 1:15]] -> [(1:10, 1:15]] -> []
+        // lsp: 1:10 -> 1:15
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry is next position of initial last sent position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(1, 15).toString());
+
+        // isp: [(1:11, 1:15]] -> [(1:10, 1:16]] -> []
+        // lsp: 1:10 -> 1:16
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entries contain next position of initial last sent position
+        // first of the new entries is less than initial last sent position
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 9, createMessage("test", 1)),
+                        EntryImpl.create(1, 11, createMessage("test", 2)),
+                        EntryImpl.create(1, 16, createMessage("test", 3))), 
true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(1, 16).toString());
+
+        // isp: [(1:11, 1:15]] -> [(1:11, 1:15]] -> [(1:11, 1:15]]
+        // lsp: 1:10 -> 1:10
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry isn't  next position of initial last sent position
+        // the range contains the new entry
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 15, createMessage("test", 
1))), true);
+        assertEquals(individuallySentPositions.toString(), 
expectedIndividuallySentPositions.toString());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:11, 1:15]] -> [(1:11, 1:16]] -> [(1:11, 1:16]]
+        // lsp: 1:10 -> 1:10
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry isn't next position of initial last sent position
+        // the range doesn't contain the new entry
+        // the new entry is next position of upper bound
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 16);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 16, createMessage("test", 
1))), true);
+        assertEquals(individuallySentPositions.toString(), 
expectedIndividuallySentPositions.toString());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:11, 1:15]] -> [(1:11, 1:15], (1:16, 1:17]] -> [(1:11, 
1:15], (1:16, 1:17]]
+        // lsp: 1:10 -> 1:10
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry isn't next position of initial last sent position
+        // the range doesn't contain the new entry
+        // the new entry isn't next position of upper bound
+        // the new entry is same ledger
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15);
+        expectedIndividuallySentPositions.addOpenClosed(1, 16, 1, 17);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 17, createMessage("test", 
1))), true);
+        assertEquals(individuallySentPositions.toString(), 
expectedIndividuallySentPositions.toString());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        /*
+        On multiple contiguous ledgers
+         */
+
+        // isp: [(1:11, 1:18]] -> [(1:11, 1:18], (2:-1, 2:0]] -> [(1:11, 
1:18], (2:-1, 2:0]]
+        // lsp: 1:10 -> 1:10
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry isn't next position of initial last sent position
+        // the range doesn't contain the new entry
+        // the new entry isn't next position of upper bound
+        // the new entry isn't same ledger
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 18);
+        expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 18);
+        expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 0);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(2, 0, createMessage("test", 
1))), true);
+        assertEquals(individuallySentPositions.toString(), 
expectedIndividuallySentPositions.toString());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+
+        // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:10, 1:19], (2:-1, 2:0]] -> 
[]
+        // lsp: 1:10 -> 2:0
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry is next position of initial last sent position
+        // the new entry isn't same ledger
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+        individuallySentPositions.addOpenClosed(2, -1, 2, 0);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(2, 0).toString());
+
+        // isp: [(1:11, 1:19], (2:-1, 2:19], (3:-1, 3:0]] -> [(1:10, 1:19], 
(2:-1, 2:19], (3:-1, 3:0]] -> []
+        // lsp: 1:10 -> 3:0
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry is next position of initial last sent position
+        // the new entry isn't same ledger
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+        individuallySentPositions.addOpenClosed(2, -1, 2, 19);
+        individuallySentPositions.addOpenClosed(3, -1, 3, 0);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 
1))), true);
+        assertTrue(individuallySentPositions.isEmpty());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
PositionFactory.create(3, 0).toString());
+
+        // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:11, 1:19], (2:-1, 2:1]] -> 
[(1:11, 1:19], (2:-1, 2:1]]
+        // lsp: 1:10 -> 1:10
+        // upper bound is greater than initial last sent position
+        // the range doesn't contain next position of initial last sent 
position
+        // the new entry isn't next position of initial last sent position
+        // the new entry isn't same ledger
+        assertNull(clearPosition.get());
+        individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+        individuallySentPositions.addOpenClosed(2, -1, 2, 0);
+        expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 19);
+        expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 1);
+        
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+                Arrays.asList(EntryImpl.create(2, 1, createMessage("test", 
1))), true);
+        assertEquals(individuallySentPositions.toString(), 
expectedIndividuallySentPositions.toString());
+        assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 5b2998216e8..14403765105 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -233,7 +233,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
                 "unackedMessages",
                 "avgMessagesPerEntry",
                 "blockedConsumerOnUnackedMsgs",
-                "readPositionWhenJoining",
+                "lastSentPositionWhenJoining",
                 "lastAckedTime",
                 "lastAckedTimestamp",
                 "lastConsumedTime",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 92c51da64d3..e8fd5378316 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -26,6 +29,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -33,6 +37,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,6 +54,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
@@ -56,17 +62,24 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
+import 
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -1096,13 +1109,21 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         final String topicName = 
"persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID();
         final String subName = "my-sub";
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Key_Shared)
                 
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true))
                 .subscribe();
 
+        @Cleanup
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+        producer.send("message".getBytes());
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS)));
+
         CompletableFuture<Optional<Topic>> future = 
pulsar.getBrokerService().getTopicIfExists(topicName);
         assertTrue(future.isDone());
         assertTrue(future.get().isPresent());
@@ -1110,14 +1131,18 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
                 (PersistentStickyKeyDispatcherMultipleConsumers) 
topic.getSubscription(subName).getDispatcher();
         assertTrue(dispatcher.isAllowOutOfOrderDelivery());
-        consumer.close();
+        assertNull(dispatcher.getLastSentPositionField());
+        assertNull(dispatcher.getIndividuallySentPositionsField());
+        consumer1.close();
 
-        consumer = pulsarClient.newConsumer()
+        final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Key_Shared)
                 
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
                 .subscribe();
+        producer.send("message".getBytes());
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(consumer2.receive(100, TimeUnit.MILLISECONDS)));
 
         future = pulsar.getBrokerService().getTopicIfExists(topicName);
         assertTrue(future.isDone());
@@ -1125,7 +1150,9 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         topic = future.get().get();
         dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) 
topic.getSubscription(subName).getDispatcher();
         assertFalse(dispatcher.isAllowOutOfOrderDelivery());
-        consumer.close();
+        assertNotNull(dispatcher.getLastSentPositionField());
+        assertNotNull(dispatcher.getIndividuallySentPositionsField());
+        consumer2.close();
     }
 
     @Test(timeOut = 30_000)
@@ -1199,6 +1226,370 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         l.await();
     }
 
+    @DataProvider(name = "preSend")
+    private Object[][] preSendProvider() {
+        return new Object[][] { { false }, { true } };
+    }
+
+    @Test(timeOut = 30_000, dataProvider = "preSend")
+    public void testCheckBetweenSkippingAndRecentlyJoinedConsumers(boolean 
preSend) throws Exception {
+        conf.setSubscriptionKeySharedUseConsistentHashing(true);
+
+        final String topicName = 
"persistent://public/default/recently-joined-consumers-" + UUID.randomUUID();
+        final String subName = "my-sub";
+
+        @Cleanup
+        final Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+        if (preSend) {
+            // verify that the test succeeds even if the topic has a message
+            p.send("msg");
+        }
+
+        final Supplier<ConsumerBuilder<String>> cb = () -> 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+                        .setAllowOutOfOrderDelivery(false));
+
+        // create 2 consumers
+        final String c1ConsumerName = "c1";
+        @Cleanup
+        final Consumer<String> c1 = 
cb.get().consumerName(c1ConsumerName).receiverQueueSize(1).subscribe();
+        @Cleanup
+        final Consumer<String> c2 = 
cb.get().consumerName("c2").receiverQueueSize(1000).subscribe();
+
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        final Field recentlyJoinedConsumersField = 
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("recentlyJoinedConsumers");
+        recentlyJoinedConsumersField.setAccessible(true);
+        final LinkedHashMap<org.apache.pulsar.broker.service.Consumer, 
Position> recentlyJoinedConsumers = 
(LinkedHashMap<org.apache.pulsar.broker.service.Consumer, Position>) 
recentlyJoinedConsumersField.get(dispatcher);
+        final String keyA = "key-a";
+        final int hashA = 
Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
+        final Map<Integer, String> hashConsumerMap = new HashMap<>();
+        hashConsumerMap.put(hashA, c1.getConsumerName());
+
+        // enforce the selector will return c1 if keyA
+        final Field selectorField = 
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+        selectorField.setAccessible(true);
+        final StickyKeyConsumerSelector selector = 
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+        selectorField.set(dispatcher, selector);
+        doAnswer((invocationOnMock -> {
+            final int hash = invocationOnMock.getArgument(0);
+            final String consumerName = hashConsumerMap.getOrDefault(hash, 
c2.getConsumerName());
+            return dispatcher.getConsumers().stream().filter(consumer -> 
consumer.consumerName().equals(consumerName)).findFirst().get();
+        })).when(selector).select(anyInt());
+
+        // send and receive
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(),
 1));
+        final MessageIdImpl msg0Id = (MessageIdImpl) 
p.newMessage().key(keyA).value("msg-0").send();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(),
 0));
+
+        final MessageIdImpl msg1Id = (MessageIdImpl) 
p.newMessage().key(keyA).value("msg-1").send();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(),
 2));
+
+        final Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        final MessageRedeliveryController redeliveryMessages = 
(MessageRedeliveryController) redeliveryMessagesField.get(dispatcher);
+
+        final Set<Position> replayMsgSet = 
redeliveryMessages.getMessagesToReplayNow(3);
+        assertEquals(replayMsgSet.size(), 1);
+        final Position replayMsg = replayMsgSet.stream().findAny().get();
+        assertEquals(replayMsg, PositionFactory.create(msg1Id.getLedgerId(), 
msg1Id.getEntryId()));
+
+        // add c3
+        final String c3ConsumerName = "c3";
+        hashConsumerMap.put(hashA, c3ConsumerName);
+        @Cleanup
+        final Consumer<String> c3 = 
cb.get().consumerName(c3ConsumerName).subscribe();
+        final List<Message<String>> c3Msgs = new ArrayList<>();
+        final org.apache.pulsar.broker.service.Consumer c3Broker = 
dispatcher.getConsumers().stream().filter(consumer -> 
consumer.consumerName().equals(c3ConsumerName)).findFirst().get();
+        assertEquals(recentlyJoinedConsumers.get(c3Broker), 
PositionFactory.create(msg0Id.getLedgerId(), msg0Id.getEntryId()));
+
+        // None of messages are sent to c3.
+        Message<String> c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+        assertNull(c3Msg);
+
+        // Disconnect c1
+        c1.close();
+
+        c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+        assertNotNull(c3Msg);
+        c3Msgs.add(c3Msg);
+        // The mark delete position will move forward. Then remove c3 from 
recentlyJoinedConsumers.
+        c3.acknowledge(c3Msg);
+        Awaitility.await().untilAsserted(() -> 
assertNull(recentlyJoinedConsumers.get(c3Broker)));
+        c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+        assertNotNull(c3Msg);
+        c3Msgs.add(c3Msg);
+        c3.acknowledge(c3Msg);
+
+        // check ordering
+        
assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId()) 
< 0);
+    }
+
+    @Test(timeOut = 30_000)
+    public void testLastSentPositionWhenRecreatingDispatcher() throws 
Exception {
+        // The lastSentPosition and individuallySentPositions should be 
initialized
+        // by the markDeletedPosition and individuallyDeletedMessages.
+        final String topicName = "persistent://public/default/rewind-" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+
+        final int numMessages = 9;
+        final List<String> keys = Arrays.asList("key-a", "key-b", "key-c");
+        final AtomicInteger receiveCounter = new AtomicInteger();
+        final AtomicInteger ackCounter = new AtomicInteger();
+
+        @Cleanup
+        final Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        final Supplier<ConsumerBuilder<Integer>> cb = () -> 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+                        .setAllowOutOfOrderDelivery(false));
+
+        @Cleanup
+        final Consumer<Integer> c1 = cb.get().messageListener((c, msg) -> {
+            if (keys.get(0).equals(msg.getKey())) {
+                try {
+                    c.acknowledge(msg);
+                    ackCounter.getAndIncrement();
+                } catch (PulsarClientException e) {
+                    fail(e.getMessage());
+                }
+            }
+            receiveCounter.getAndIncrement();
+        }).subscribe();
+
+        PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        LongPairRangeSet<Position> individuallySentPositionsField = 
dispatcher.getIndividuallySentPositionsField();
+        final ManagedCursorImpl cursor = (ManagedCursorImpl) 
((PersistentSubscription) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor();
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
+
+        MessageIdImpl msgId = null;
+        for (int i = 0; i < numMessages; i++) {
+            msgId = (MessageIdImpl) producer.newMessage().key(keys.get(i % 
keys.size())).value(i).send();
+        }
+
+        // wait for consumption
+        Awaitility.await().untilAsserted(() -> 
assertEquals(receiveCounter.get(), numMessages));
+        assertEquals(ackCounter.get(), numMessages / keys.size());
+        assertEquals(dispatcher.getLastSentPositionField(), 
PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()));
+        assertTrue(individuallySentPositionsField.isEmpty());
+        receiveCounter.set(0);
+        ackCounter.set(0);
+
+        // create expected values
+        final Position expectedLastSentPosition = 
ledger.getNextValidPosition(cursor.getMarkDeletedPosition());
+        final ConcurrentOpenLongPairRangeSet<Position>
+                expectedIndividuallySentPositions =  new 
ConcurrentOpenLongPairRangeSet<>(4096, PositionFactory::create);
+        cursor.getIndividuallyDeletedMessagesSet().forEach(range -> {
+            final Position lower = range.lowerEndpoint();
+            final Position upper = range.upperEndpoint();
+            
expectedIndividuallySentPositions.addOpenClosed(lower.getLedgerId(), 
lower.getEntryId(), upper.getLedgerId(), upper.getEntryId());
+            return true;
+        });
+
+        // modify subscription type to close current dispatcher
+        admin.topics().createSubscription(topicName, "sub-alt", 
MessageId.earliest);
+        c1.close();
+        @Cleanup
+        final Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+        c2.close();
+        
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getType(),
 SubscriptionType.Exclusive.toString());
+
+        @Cleanup
+        final Consumer<Integer> c3 = cb.get().receiverQueueSize(0).subscribe();
+        dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        individuallySentPositionsField = 
dispatcher.getIndividuallySentPositionsField();
+
+        assertNull(dispatcher.getLastSentPositionField());
+        assertTrue(individuallySentPositionsField.isEmpty());
+
+        assertNotNull(c3.receive());
+
+        // validate the individuallySentPosition is initialized by the 
individuallyDeletedMessages
+        // if it is not initialized expectedly, it has sent-hole of key-c 
messages because key-c messages are not scheduled to be dispatched to some 
consumer(already acked).
+        assertEquals(dispatcher.getLastSentPositionField(), 
expectedLastSentPosition);
+        assertEquals(individuallySentPositionsField.toString(), 
expectedIndividuallySentPositions.toString());
+    }
+
+    @Test(timeOut = 30_000)
+    public void testLastSentPositionWhenResettingCursor() throws Exception {
+        // The lastSentPosition and individuallySentPositions should be 
cleared if reset-cursor operation is executed.
+        final String nsName = "public/default";
+        final String topicName = "persistent://" + nsName + "/reset-cursor-" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+
+        final int numMessages = 10;
+        final List<String> keys = Arrays.asList("key-a", "key-b");
+        final AtomicInteger ackCounter = new AtomicInteger();
+
+        @Cleanup
+        final Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        final Supplier<ConsumerBuilder<Integer>> cb = () -> 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .receiverQueueSize(0)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+                        .setAllowOutOfOrderDelivery(false));
+
+        @Cleanup
+        final Consumer<Integer> c1 = cb.get().consumerName("c1").subscribe();
+        @Cleanup
+        final Consumer<Integer> c2 = cb.get().consumerName("c2").subscribe();
+
+        // set retention policy
+        admin.namespaces().setRetention(nsName, new RetentionPolicies(1, 1024 
* 1024));
+
+        // enforce the selector will return c1 if keys.get(0)
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        final int hashA = 
Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes());
+        final Map<Integer, String> hashConsumerMap = new HashMap<>();
+        hashConsumerMap.put(hashA, c1.getConsumerName());
+        final Field selectorField = 
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+        selectorField.setAccessible(true);
+        final StickyKeyConsumerSelector selector = 
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+        selectorField.set(dispatcher, selector);
+        doAnswer((invocationOnMock -> {
+            final int hash = invocationOnMock.getArgument(0);
+            final String consumerName = hashConsumerMap.getOrDefault(hash, 
c2.getConsumerName());
+            return dispatcher.getConsumers().stream().filter(consumer -> 
consumer.consumerName().equals(consumerName)).findFirst().get();
+        })).when(selector).select(anyInt());
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().key(keys.get(i % 
keys.size())).value(i).send();
+        }
+
+        // consume some messages
+        for (int i = 0; i < numMessages / keys.size(); i++) {
+            final Message<Integer> msg = c2.receive();
+            if (msg != null) {
+                c2.acknowledge(msg);
+                ackCounter.getAndIncrement();
+            }
+        }
+        assertEquals(ackCounter.get(), numMessages / keys.size());
+
+        // store current lastSentPosition for comparison
+        final LongPairRangeSet<Position> individuallySentPositionsField = 
dispatcher.getIndividuallySentPositionsField();
+        assertNotNull(dispatcher.getLastSentPositionField());
+        assertFalse(individuallySentPositionsField.isEmpty());
+
+        // reset cursor and receive a message
+        admin.topics().resetCursor(topicName, subName, MessageId.earliest, 
true);
+
+        // validate the lastSentPosition and individuallySentPositions are 
cleared after resetting cursor
+        assertNull(dispatcher.getLastSentPositionField());
+        assertTrue(individuallySentPositionsField.isEmpty());
+    }
+
+    @Test(timeOut = 30_000)
+    public void testLastSentPositionWhenSkipping() throws Exception {
+        // The lastSentPosition and individuallySentPositions should be 
updated if skip operation is executed.
+        // There are updated to follow the new markDeletedPosition.
+        final String topicName = "persistent://public/default/skip-" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+
+        final int numMessages = 10;
+        final List<String> keys = Arrays.asList("key-a", "key-b");
+        final int numSkip = 2;
+        final AtomicInteger ackCounter = new AtomicInteger();
+
+        @Cleanup
+        final Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        final Supplier<ConsumerBuilder<Integer>> cb = () -> 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+                        .setAllowOutOfOrderDelivery(false))
+                .receiverQueueSize(0);
+
+        @Cleanup
+        final Consumer<Integer> c1 = cb.get().consumerName("c1").subscribe();
+        @Cleanup
+        final Consumer<Integer> c2 = cb.get().consumerName("c2").subscribe();
+
+        // enforce the selector will return c1 if keys.get(0)
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        final int hashA = 
Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes());
+        final Map<Integer, String> hashConsumerMap = new HashMap<>();
+        hashConsumerMap.put(hashA, c1.getConsumerName());
+        final Field selectorField = 
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+        selectorField.setAccessible(true);
+        final StickyKeyConsumerSelector selector = 
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+        selectorField.set(dispatcher, selector);
+        doAnswer((invocationOnMock -> {
+            final int hash = invocationOnMock.getArgument(0);
+            final String consumerName = hashConsumerMap.getOrDefault(hash, 
c2.getConsumerName());
+            return dispatcher.getConsumers().stream().filter(consumer -> 
consumer.consumerName().equals(consumerName)).findFirst().get();
+        })).when(selector).select(anyInt());
+
+        final List<Position> positionList = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            final MessageIdImpl msgId = (MessageIdImpl) 
producer.newMessage().key(keys.get(i % keys.size())).value(i).send();
+            positionList.add(PositionFactory.create(msgId.getLedgerId(), 
msgId.getEntryId()));
+        }
+
+        // consume some messages
+        for (int i = 0; i < numSkip; i++) {
+            final Message<Integer> msg = c2.receive();
+            if (msg != null) {
+                c2.acknowledge(msg);
+                ackCounter.getAndIncrement();
+            }
+        }
+        assertEquals(ackCounter.get(), numSkip);
+        final ManagedCursorImpl managedCursor = ((ManagedCursorImpl) 
((PersistentSubscription) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor());
+        Awaitility.await().untilAsserted(() -> 
assertEquals(managedCursor.getIndividuallyDeletedMessagesSet().size(), 2));
+
+        // store current lastSentPosition for comparison
+        final Position lastSentPositionBeforeSkip = 
dispatcher.getLastSentPositionField();
+        final LongPairRangeSet<Position> individuallySentPositionsField = 
dispatcher.getIndividuallySentPositionsField();
+        assertNotNull(lastSentPositionBeforeSkip);
+        assertFalse(individuallySentPositionsField.isEmpty());
+
+        // skip messages and receive a message
+        admin.topics().skipMessages(topicName, subName, numSkip);
+        final MessageIdImpl msgIdAfterSkip = (MessageIdImpl) 
c1.receive().getMessageId();
+        final Position positionAfterSkip = 
PositionFactory.create(msgIdAfterSkip.getLedgerId(),
+                msgIdAfterSkip.getEntryId());
+        assertEquals(positionAfterSkip, positionList.get(4));
+
+        // validate the lastSentPosition is updated to the new 
markDeletedPosition
+        // validate the individuallySentPositions is updated expectedly 
(removeAtMost the new markDeletedPosition)
+        final Position lastSentPosition = 
dispatcher.getLastSentPositionField();
+        assertNotNull(lastSentPosition);
+        assertTrue(lastSentPosition.compareTo(lastSentPositionBeforeSkip) > 0);
+        assertEquals(lastSentPosition, positionList.get(4));
+        assertTrue(individuallySentPositionsField.isEmpty());
+    }
 
     private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String 
subscription) {
         if 
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index d2d3600df96..5f2cf7b209e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -72,8 +72,8 @@ public interface ConsumerStats {
     /** Flag to verify if consumer is blocked due to reaching threshold of 
unacked messages. */
     boolean isBlockedConsumerOnUnackedMsgs();
 
-    /** The read position of the cursor when the consumer joining. */
-    String getReadPositionWhenJoining();
+    /** The last sent position of the cursor when the consumer joining. */
+    String getLastSentPositionWhenJoining();
 
     /** Address of this consumer. */
     String getAddress();
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index d4850adaa6f..cabef1ca960 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -118,6 +118,12 @@ public interface SubscriptionStats {
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     Map<String, String> getConsumersAfterMarkDeletePosition();
 
+    /** The last sent position of the cursor. This is for Key_Shared 
subscription. */
+    String getLastSentPosition();
+
+    /** Set of individually sent ranges. This is for Key_Shared subscription. 
*/
+    String getIndividuallySentPositions();
+
     /** SubscriptionProperties (key/value strings) associated with this 
subscribe. */
     Map<String, String> getSubscriptionProperties();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index de36b330b7f..b4c5d21e692 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -77,8 +77,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
     /** Flag to verify if consumer is blocked due to reaching threshold of 
unacked messages. */
     public boolean blockedConsumerOnUnackedMsgs;
 
-    /** The read position of the cursor when the consumer joining. */
-    public String readPositionWhenJoining;
+    /** The last sent position of the cursor when the consumer joining. */
+    public String lastSentPositionWhenJoining;
 
     /** Address of this consumer. */
     private String address;
@@ -113,7 +113,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
         this.availablePermits += stats.availablePermits;
         this.unackedMessages += stats.unackedMessages;
         this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
-        this.readPositionWhenJoining = stats.readPositionWhenJoining;
+        this.lastSentPositionWhenJoining = stats.lastSentPositionWhenJoining;
         return this;
     }
 
@@ -141,8 +141,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
         this.clientVersion = clientVersion;
     }
 
-    public String getReadPositionWhenJoining() {
-        return readPositionWhenJoining;
+    public String getLastSentPositionWhenJoining() {
+        return lastSentPositionWhenJoining;
     }
 
     public String getLastAckedTime() {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a8ea0060629..ab4d07c7ae4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -126,6 +126,12 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     public Map<String, String> consumersAfterMarkDeletePosition;
 
+    /** The last sent position of the cursor. This is for Key_Shared 
subscription. */
+    public String lastSentPosition;
+
+    /** Set of individually sent ranges. This is for Key_Shared subscription. 
*/
+    public String individuallySentPositions;
+
     /** The number of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRanges;
 


Reply via email to