This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d7e8ea16e66 [fix][broker] Introduce the last sent position to fix
message ordering issues in Key_Shared (PIP-282) (#21953)
d7e8ea16e66 is described below
commit d7e8ea16e6682df9a9354cda25cf4f1f9cb54429
Author: Yuri Mizushima <[email protected]>
AuthorDate: Fri Jul 19 12:37:41 2024 +0900
[fix][broker] Introduce the last sent position to fix message ordering
issues in Key_Shared (PIP-282) (#21953)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../org/apache/pulsar/broker/service/Consumer.java | 10 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 195 ++++++++--
.../service/persistent/PersistentSubscription.java | 19 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 196 +++++++++-
...ntStickyKeyDispatcherMultipleConsumersTest.java | 357 ++++++++++++++++++
.../pulsar/broker/stats/ConsumerStatsTest.java | 2 +-
.../client/api/KeySharedSubscriptionTest.java | 399 ++++++++++++++++++++-
.../pulsar/common/policies/data/ConsumerStats.java | 4 +-
.../common/policies/data/SubscriptionStats.java | 6 +
.../policies/data/stats/ConsumerStatsImpl.java | 10 +-
.../policies/data/stats/SubscriptionStatsImpl.java | 6 +
13 files changed, 1158 insertions(+), 61 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4ef9678f3e1..f99ee957e02 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3472,6 +3472,19 @@ public class ManagedCursorImpl implements ManagedCursor {
return individualDeletedMessages;
}
+ public Position
processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
+ LongPairRangeSet.RangeProcessor<Position> processor) {
+ final Position mdp;
+ lock.readLock().lock();
+ try {
+ mdp = markDeletePosition;
+ individualDeletedMessages.forEach(processor);
+ } finally {
+ lock.readLock().unlock();
+ }
+ return mdp;
+ }
+
public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b7734906f75..209bf57b24f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3497,7 +3497,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
* the position range
* @return the count of entries
*/
- long getNumberOfEntries(Range<Position> range) {
+ public long getNumberOfEntries(Range<Position> range) {
Position fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 02e21c44c91..dca64395d86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -145,7 +145,7 @@ public class Consumer {
private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
- private Position readPositionWhenJoining;
+ private Position lastSentPositionWhenJoining;
private final String clientAddress; // IP address only, no port number
included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
@@ -931,8 +931,8 @@ public class Consumer {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
- if (readPositionWhenJoining != null) {
- stats.readPositionWhenJoining = readPositionWhenJoining.toString();
+ if (lastSentPositionWhenJoining != null) {
+ stats.lastSentPositionWhenJoining =
lastSentPositionWhenJoining.toString();
}
return stats;
}
@@ -1166,8 +1166,8 @@ public class Consumer {
return preciseDispatcherFlowControl;
}
- public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
- this.readPositionWhenJoining = readPositionWhenJoining;
+ public void setLastSentPositionWhenJoining(Position
lastSentPositionWhenJoining) {
+ this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
}
public int getMaxUnackedMessages() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 766f45ad990..91cec1f8e90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -34,9 +34,12 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -55,6 +58,8 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
+import
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,12 +78,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
*/
private final LinkedHashMap<Consumer, Position> recentlyJoinedConsumers;
+ /**
+ * The lastSentPosition and the individuallySentPositions are not thread
safe.
+ */
+ @Nullable
+ private Position lastSentPosition;
+ private final LongPairRangeSet<Position> individuallySentPositions;
+ private static final LongPairRangeSet.LongPairConsumer<Position>
positionRangeConverter = PositionFactory::create;
+
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
+ this.individuallySentPositions =
+ allowOutOfOrderDelivery ? null : new
ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
@@ -124,15 +139,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
})
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
- Position readPositionWhenJoining = cursor.getReadPosition();
- consumer.setReadPositionWhenJoining(readPositionWhenJoining);
- // If this was the 1st consumer, or if all the messages are
already acked, then we
- // don't need to do anything special
- if (!allowOutOfOrderDelivery
- && recentlyJoinedConsumers != null
- && consumerList.size() > 1
- &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
- recentlyJoinedConsumers.put(consumer,
readPositionWhenJoining);
+ if (!allowOutOfOrderDelivery) {
+ final Position lastSentPositionWhenJoining =
updateIfNeededAndGetLastSentPosition();
+ if (lastSentPositionWhenJoining != null) {
+
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+ // If this was the 1st consumer, or if all the
messages are already acked, then we
+ // don't need to do anything special
+ if (recentlyJoinedConsumers != null
+ && consumerList.size() > 1
+ &&
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+ recentlyJoinedConsumers.put(consumer,
lastSentPositionWhenJoining);
+ }
+ }
}
}
});
@@ -148,10 +166,16 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// eventually causing all consumers to get stuck.
selector.removeConsumer(consumer);
super.removeConsumer(consumer);
- if (recentlyJoinedConsumers != null) {
+ if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null) {
recentlyJoinedConsumers.remove(consumer);
if (consumerList.size() == 1) {
recentlyJoinedConsumers.clear();
+ } else if (consumerList.isEmpty()) {
+ // The subscription removes consumers if rewind or reset
cursor operations are called.
+ // The dispatcher must clear lastSentPosition and
individuallySentPositions because
+ // these operations trigger re-sending messages.
+ lastSentPosition = null;
+ individuallySentPositions.clear();
}
if (removeConsumersFromRecentJoinedConsumers() ||
!redeliveryMessages.isEmpty()) {
readMoreEntries();
@@ -193,9 +217,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return false;
}
- // A corner case that we have to retry a readMoreEntries in order to
preserver order delivery.
- // This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
+ // A corner case that we have to retry a readMoreEntries in order
to preserver order delivery.
+ // This may happen when consumer closed. See issue #12885 for
details.
NavigableSet<Position> messagesToReplayNow =
this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty())
{
Position replayPosition = messagesToReplayNow.first();
@@ -229,6 +253,24 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
}
+
+ // Update if the markDeletePosition move forward
+ updateIfNeededAndGetLastSentPosition();
+
+ // Should not access to individualDeletedMessages from outside
managed cursor
+ // because it doesn't guarantee thread safety.
+ if (lastSentPosition == null) {
+ if (cursor.getMarkDeletedPosition() != null) {
+ lastSentPosition = ((ManagedCursorImpl) cursor)
+
.processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(range -> {
+ final Position lower = range.lowerEndpoint();
+ final Position upper = range.upperEndpoint();
+
individuallySentPositions.addOpenClosed(lower.getLedgerId(), lower.getEntryId(),
+ upper.getLedgerId(),
upper.getEntryId());
+ return true;
+ });
+ }
+ }
}
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
@@ -280,12 +322,24 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
if (messagesForC > 0) {
- // remove positions first from replay list first :
sendMessages recycles entries
- if (readType == ReadType.Replay) {
- for (int i = 0; i < messagesForC; i++) {
- Entry entry = entriesWithSameKey.get(i);
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ for (int i = 0; i < messagesForC; i++) {
+ final Entry entry = entriesWithSameKey.get(i);
+ // remove positions first from replay list first :
sendMessages recycles entries
+ if (readType == ReadType.Replay) {
redeliveryMessages.remove(entry.getLedgerId(),
entry.getEntryId());
}
+ // Add positions to individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery) {
+ final Position position = entry.getPosition();
+ // Store to individuallySentPositions even if
lastSentPosition is null
+ if ((lastSentPosition == null ||
position.compareTo(lastSentPosition) > 0)
+ &&
!individuallySentPositions.contains(position.getLedgerId(),
position.getEntryId())) {
+ final Position previousPosition =
managedLedger.getPreviousPosition(position);
+
individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(),
+ previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
+ }
+ }
}
SendMessageInfo sendMessageInfo =
SendMessageInfo.getThreadLocal();
@@ -311,6 +365,61 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<Position> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
+
.getNumberOfEntries(com.google.common.collect.Range.openClosed(lastSentPosition,
+ range.lowerEndpoint())) <= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Found a position range to last sent:
{}", name, range);
+ }
+ Position newLastSentPosition = range.upperEndpoint();
+ Position positionAfterNewLastSent = managedLedger
+ .getNextValidPosition(newLastSentPosition);
+ // sometime ranges are connected but belongs to different
ledgers
+ // so, they are placed sequentially
+ // eg: (2:10..3:15] can be returned as
(2:10..2:15],[3:0..3:15].
+ // So, try to iterate over connected range and found the
last non-connected range
+ // which gives new last sent position.
+ final Position lastConfirmedEntrySnapshot =
managedLedger.getLastConfirmedEntry();
+ if (lastConfirmedEntrySnapshot != null) {
+ while
(positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0) {
+ if
(individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(),
+ positionAfterNewLastSent.getEntryId())) {
+ range =
individuallySentPositions.rangeContaining(
+
positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId());
+ newLastSentPosition = range.upperEndpoint();
+ positionAfterNewLastSent =
managedLedger.getNextValidPosition(newLastSentPosition);
+ // check if next valid position is also
deleted and part of the deleted-range
+ continue;
+ }
+ break;
+ }
+ }
+
+ if (lastSentPosition.compareTo(newLastSentPosition) < 0) {
+ lastSentPosition = newLastSentPosition;
+ }
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ }
+ }
+ }
+
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
@@ -351,10 +460,10 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return maxMessages;
}
removeConsumersFromRecentJoinedConsumers();
- Position maxReadPosition = recentlyJoinedConsumers.get(consumer);
+ Position maxLastSentPosition = recentlyJoinedConsumers.get(consumer);
// At this point, all the old messages were already consumed and this
consumer
// is now ready to receive any message
- if (maxReadPosition == null) {
+ if (maxLastSentPosition == null) {
// The consumer has not recently joined, so we can send all
messages
return maxMessages;
}
@@ -373,16 +482,16 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// But the message [2,3] should not dispatch to consumer2.
if (readType == ReadType.Replay) {
- Position minReadPositionForRecentJoinedConsumer =
recentlyJoinedConsumers.values().iterator().next();
- if (minReadPositionForRecentJoinedConsumer != null
- &&
minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
- maxReadPosition = minReadPositionForRecentJoinedConsumer;
+ Position minLastSentPositionForRecentJoinedConsumer =
recentlyJoinedConsumers.values().iterator().next();
+ if (minLastSentPositionForRecentJoinedConsumer != null
+ &&
minLastSentPositionForRecentJoinedConsumer.compareTo(maxLastSentPosition) < 0) {
+ maxLastSentPosition =
minLastSentPositionForRecentJoinedConsumer;
}
}
// Here, the consumer is one that has recently joined, so we can only
send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
- if ((entries.get(i)).compareTo(maxReadPosition) >= 0) {
+ if ((entries.get(i)).compareTo(maxLastSentPosition) > 0) {
// We have already crossed the divider line. All messages in
the list are now
// newer than what we can currently dispatch to this consumer
return i;
@@ -416,11 +525,9 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
Position mdp = cursor.getMarkDeletedPosition();
if (mdp != null) {
- Position nextPositionOfTheMarkDeletePosition =
- ((ManagedLedgerImpl)
cursor.getManagedLedger()).getNextValidPosition(mdp);
while (itr.hasNext()) {
Map.Entry<Consumer, Position> entry = itr.next();
- if
(entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
+ if (entry.getValue().compareTo(mdp) <= 0) {
itr.remove();
hasConsumerRemovedFromTheRecentJoinedConsumers = true;
} else {
@@ -431,6 +538,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return hasConsumerRemovedFromTheRecentJoinedConsumers;
}
+ @Nullable
+ private synchronized Position updateIfNeededAndGetLastSentPosition() {
+ if (lastSentPosition == null) {
+ return null;
+ }
+ final Position mdp = cursor.getMarkDeletedPosition();
+ if (mdp != null && mdp.compareTo(lastSentPosition) > 0) {
+ lastSentPosition = mdp;
+ }
+ return lastSentPosition;
+ }
+
@Override
protected synchronized NavigableSet<Position> getMessagesToReplayNow(int
maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
@@ -551,6 +670,30 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return recentlyJoinedConsumers;
}
+ public synchronized String getLastSentPosition() {
+ if (lastSentPosition == null) {
+ return null;
+ }
+ return lastSentPosition.toString();
+ }
+
+ @VisibleForTesting
+ public Position getLastSentPositionField() {
+ return lastSentPosition;
+ }
+
+ public synchronized String getIndividuallySentPositions() {
+ if (individuallySentPositions == null) {
+ return null;
+ }
+ return individuallySentPositions.toString();
+ }
+
+ @VisibleForTesting
+ public LongPairRangeSet<Position> getIndividuallySentPositionsField() {
+ return individuallySentPositions;
+ }
+
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
return selector.getConsumerKeyHashRanges();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a1d51668ca8..77aa5f82c39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1305,9 +1305,26 @@ public class PersistentSubscription extends
AbstractSubscription {
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null &&
recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
-
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
+ // The dispatcher allows same name consumers
+ final StringBuilder stringBuilder = new StringBuilder();
+
stringBuilder.append("consumerName=").append(k.consumerName())
+ .append(", consumerId=").append(k.consumerId());
+ if (k.cnx() != null) {
+ stringBuilder.append(",
address=").append(k.cnx().clientAddress());
+ }
+
subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(),
v.toString());
});
}
+ final String lastSentPosition =
((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .getLastSentPosition();
+ if (lastSentPosition != null) {
+ subStats.lastSentPosition = lastSentPosition;
+ }
+ final String individuallySentPositions =
((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .getIndividuallySentPositions();
+ if (individuallySentPositions != null) {
+ subStats.individuallySentPositions = individuallySentPositions;
+ }
}
subStats.nonContiguousDeletedMessagesRanges =
cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 1c83941d6e7..5432b8a430d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -56,6 +58,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
@@ -65,6 +68,7 @@ import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
@@ -75,6 +79,8 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.GetStatsOptions;
@@ -139,7 +145,10 @@ import
org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
@@ -3449,8 +3458,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
}
@Test
- public void testGetReadPositionWhenJoining() throws Exception {
- final String topic =
"persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" +
UUID.randomUUID().toString();
+ public void testGetLastSentPositionWhenJoining() throws Exception {
+ final String topic =
"persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" +
UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -3458,34 +3467,189 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
.enableBatching(false)
.create();
+ @Cleanup
+ final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .subscribe();
+
final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " +
i).getBytes());
+ consumer1.receive();
}
- List<Consumer<byte[]>> consumers = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionType(SubscriptionType.Key_Shared)
- .subscriptionName(subName)
- .subscribe();
- consumers.add(consumer);
- }
+ @Cleanup
+ final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .subscribe();
TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
- ConsumerStats consumerStats = subStats.getConsumers().get(0);
- Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
- PositionFactory.create(messageId.getLedgerId(),
messageId.getEntryId() + 1).toString());
+ ConsumerStats consumerStats = subStats.getConsumers().stream()
+ .filter(s ->
s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
+ Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
+ PositionFactory.create(messageId.getLedgerId(),
messageId.getEntryId()).toString());
+ }
+
+ @Test
+ public void testGetLastSentPosition() throws Exception {
+ final String topic =
"persistent://prop-xyz/ns1/testGetLastSentPosition-" +
UUID.randomUUID().toString();
+ final String subName = "my-sub";
+ @Cleanup
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ final AtomicInteger counter = new AtomicInteger();
+ @Cleanup
+ final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .messageListener((c, msg) -> {
+ try {
+ c.acknowledge(msg);
+ counter.getAndIncrement();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe();
+
+ TopicStats stats = admin.topics().getStats(topic);
+ Assert.assertEquals(stats.getSubscriptions().size(), 1);
+ SubscriptionStats subStats = stats.getSubscriptions().get(subName);
+ Assert.assertNotNull(subStats);
+ Assert.assertNull(subStats.getLastSentPosition());
- for (Consumer<byte[]> consumer : consumers) {
- consumer.close();
+ final int messages = 10;
+ MessageIdImpl messageId = null;
+ for (int i = 0; i < messages; i++) {
+ messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " +
i).getBytes());
}
+
+ Awaitility.await().untilAsserted(() -> assertEquals(counter.get(),
messages));
+
+ stats = admin.topics().getStats(topic);
+ Assert.assertEquals(stats.getSubscriptions().size(), 1);
+ subStats = stats.getSubscriptions().get(subName);
+ Assert.assertNotNull(subStats);
+ Assert.assertEquals(subStats.getLastSentPosition(),
PositionFactory.create(messageId.getLedgerId(),
messageId.getEntryId()).toString());
+ }
+
+ @Test
+ public void testGetIndividuallySentPositions() throws Exception {
+ // The producer sends messages with two types of keys.
+ // The dispatcher sends keyA messages to consumer1.
+ // Consumer1 will not receive any messages. Its receiver queue size is
1.
+ // Consumer2 will receive and ack any messages immediately.
+
+ final String topic =
"persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" +
UUID.randomUUID().toString();
+ final String subName = "my-sub";
+ @Cleanup
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ final String consumer1Name = "c1";
+ final String consumer2Name = "c2";
+
+ @Cleanup
+ final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .consumerName(consumer1Name)
+ .receiverQueueSize(1)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .subscribe();
+
+ final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
+ final String keyA = "key-a";
+ final String keyB = "key-b";
+ final int hashA =
Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
+
+ final Field selectorField =
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+ selectorField.setAccessible(true);
+ final StickyKeyConsumerSelector selector =
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+ selectorField.set(dispatcher, selector);
+
+ // the selector returns consumer1 if keyA
+ doAnswer((invocationOnMock -> {
+ final int hash = invocationOnMock.getArgument(0);
+
+ final String consumerName = hash == hashA ? consumer1Name :
consumer2Name;
+ return dispatcher.getConsumers().stream().filter(consumer ->
consumer.consumerName().equals(consumerName)).findFirst().get();
+ })).when(selector).select(anyInt());
+
+ final AtomicInteger consumer2AckCounter = new AtomicInteger();
+ @Cleanup
+ final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .consumerName(consumer2Name)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .messageListener((c, msg) -> {
+ try {
+ c.acknowledge(msg);
+ consumer2AckCounter.getAndIncrement();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe();
+
+ final LongPairRangeSet.LongPairConsumer<Position>
positionRangeConverter = PositionFactory::create;
+ final LongPairRangeSet<Position> expectedIndividuallySentPositions =
new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
+
+ TopicStats stats = admin.topics().getStats(topic);
+ Assert.assertEquals(stats.getSubscriptions().size(), 1);
+ SubscriptionStats subStats = stats.getSubscriptions().get(subName);
+ Assert.assertNotNull(subStats);
+ Assert.assertEquals(subStats.getIndividuallySentPositions(),
expectedIndividuallySentPositions.toString());
+
+ final Function<String, MessageIdImpl> sendFn = (key) -> {
+ try {
+ return (MessageIdImpl)
producer.newMessage().key(key).value(("msg").getBytes()).send();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ final List<MessageIdImpl> messageIdList = new ArrayList<>();
+
+ // the dispatcher can send keyA message, but then consumer1's receiver
queue will be full
+ messageIdList.add(sendFn.apply(keyA));
+
+ // the dispatcher can send messages other than keyA
+ messageIdList.add(sendFn.apply(keyA));
+ messageIdList.add(sendFn.apply(keyB));
+ messageIdList.add(sendFn.apply(keyA));
+ messageIdList.add(sendFn.apply(keyB));
+ messageIdList.add(sendFn.apply(keyB));
+
+ assertEquals(messageIdList.size(), 6);
+ Awaitility.await().untilAsserted(() ->
assertEquals(consumer2AckCounter.get(), 3));
+
+ // set expected value
+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(),
messageIdList.get(1).getEntryId(),
+ messageIdList.get(2).getLedgerId(),
messageIdList.get(2).getEntryId());
+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(),
messageIdList.get(3).getEntryId(),
+ messageIdList.get(5).getLedgerId(),
messageIdList.get(5).getEntryId());
+
+ stats = admin.topics().getStats(topic);
+ Assert.assertEquals(stats.getSubscriptions().size(), 1);
+ subStats = stats.getSubscriptions().get(subName);
+ Assert.assertNotNull(subStats);
+ Assert.assertEquals(subStats.getIndividuallySentPositions(),
expectedIndividuallySentPositions.toString());
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index a70b3ce7a42..1a205d0f686 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,14 +35,19 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
@@ -50,12 +55,14 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -72,11 +79,14 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -84,6 +94,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
private PulsarService pulsarMock;
private BrokerService brokerMock;
+ private ManagedLedgerImpl ledgerMock;
private ManagedCursorImpl cursorMock;
private Consumer consumerMock;
private PersistentTopic topicMock;
@@ -135,9 +146,44 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(topicName).when(topicMock).getName();
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
+ ledgerMock = mock(ManagedLedgerImpl.class);
+ doAnswer((invocationOnMock -> {
+ final Position position = invocationOnMock.getArgument(0);
+ if (position.getEntryId() > 0) {
+ return PositionFactory.create(position.getLedgerId(),
position.getEntryId() - 1);
+ } else {
+ fail("Undefined behavior on mock");
+ return PositionFactory.EARLIEST;
+ }
+ })).when(ledgerMock).getPreviousPosition(any(Position.class));
+ doAnswer((invocationOnMock -> {
+ final Position position = invocationOnMock.getArgument(0);
+ return PositionFactory.create(position.getLedgerId(),
position.getEntryId() < 0 ? 0 : position.getEntryId() + 1);
+ })).when(ledgerMock).getNextValidPosition(any(Position.class));
+ doAnswer((invocationOnMock -> {
+ final Range<Position> range = invocationOnMock.getArgument(0);
+ Position fromPosition = range.lowerEndpoint();
+ boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
+ Position toPosition = range.upperEndpoint();
+ boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+
+ long count = 0;
+
+ if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
+ // If the 2 positions are in the same ledger
+ count = toPosition.getEntryId() - fromPosition.getEntryId() -
1;
+ count += fromIncluded ? 1 : 0;
+ count += toIncluded ? 1 : 0;
+ } else {
+ fail("Undefined behavior on mock");
+ }
+ return count;
+ })).when(ledgerMock).getNumberOfEntries(any());
+
cursorMock = mock(ManagedCursorImpl.class);
doReturn(null).when(cursorMock).getLastIndividualDeletedRange();
doReturn(subscriptionName).when(cursorMock).getName();
+ doReturn(ledgerMock).when(cursorMock).getManagedLedger();
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
@@ -465,6 +511,317 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
allEntries.forEach(entry -> entry.release());
}
+
+
+ @DataProvider(name = "initializeLastSentPosition")
+ private Object[][] initialLastSentPositionProvider() {
+ return new Object[][] { { false }, { true } };
+ }
+
+ @Test(dataProvider = "initializeLastSentPosition")
+ public void testLastSentPositionAndIndividuallySentPositions(final boolean
initializeLastSentPosition) throws Exception {
+ final Position initialLastSentPosition = PositionFactory.create(1, 10);
+ final LongPairRangeSet<Position> expectedIndividuallySentPositions
+ = new ConcurrentOpenLongPairRangeSet<>(4096,
PositionFactory::create);
+
+ final Field lastSentPositionField =
PersistentStickyKeyDispatcherMultipleConsumers.class
+ .getDeclaredField("lastSentPosition");
+ lastSentPositionField.setAccessible(true);
+ final LongPairRangeSet<Position> individuallySentPositions =
persistentDispatcher.getIndividuallySentPositionsField();
+ final Supplier<Throwable> clearPosition = () -> {
+ try {
+ lastSentPositionField.set(persistentDispatcher,
initializeLastSentPosition ? initialLastSentPosition : null);
+ individuallySentPositions.clear();
+ expectedIndividuallySentPositions.clear();
+ } catch (Throwable e) {
+ return e;
+ }
+ return null;
+ };
+ if (!initializeLastSentPosition) {
+
doReturn(initialLastSentPosition).when(cursorMock).getMarkDeletedPosition();
+ doAnswer(invocationOnMock -> {
+ // skip copy operation
+ return initialLastSentPosition;
+
}).when(cursorMock).processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(any());
+ }
+
+ // Assume the range sequence is [1:0, 1:19], [2:0, 2:19], ..., [10:0,
10:19]
+ doAnswer((invocationOnMock -> {
+ final Position position = invocationOnMock.getArgument(0);
+ if (position.getEntryId() > 0) {
+ return PositionFactory.create(position.getLedgerId(),
position.getEntryId() - 1);
+ } else if (position.getLedgerId() > 0) {
+ return PositionFactory.create(position.getLedgerId() - 1, 19);
+ } else {
+ throw new NullPointerException();
+ }
+ })).when(ledgerMock).getPreviousPosition(any(Position.class));
+ doAnswer((invocationOnMock -> {
+ final Position position = invocationOnMock.getArgument(0);
+ if (position.getEntryId() < 19) {
+ return PositionFactory.create(position.getLedgerId(),
position.getEntryId() + 1);
+ } else {
+ return PositionFactory.create(position.getLedgerId() + 1, 0);
+ }
+ })).when(ledgerMock).getNextValidPosition(any(Position.class));
+ doReturn(PositionFactory.create(10,
19)).when(ledgerMock).getLastConfirmedEntry();
+ doAnswer((invocationOnMock -> {
+ final Range<Position> range = invocationOnMock.getArgument(0);
+ Position fromPosition = range.lowerEndpoint();
+ boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
+ Position toPosition = range.upperEndpoint();
+ boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+
+ if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
+ // If the 2 positions are in the same ledger
+ long count = toPosition.getEntryId() -
fromPosition.getEntryId() - 1;
+ count += fromIncluded ? 1 : 0;
+ count += toIncluded ? 1 : 0;
+ return count;
+ } else {
+ long count = 0;
+ // If the from & to are pointing to different ledgers, then we
need to :
+ // 1. Add the entries in the ledger pointed by toPosition
+ count += toPosition.getEntryId();
+ count += toIncluded ? 1 : 0;
+
+ // 2. Add the entries in the ledger pointed by fromPosition
+ count += 20 - (fromPosition.getEntryId() + 1);
+ count += fromIncluded ? 1 : 0;
+
+ // 3. Add the whole ledgers entries in between
+ for (long i = fromPosition.getLedgerId() + 1; i <
toPosition.getLedgerId(); i++) {
+ count += 20;
+ }
+
+ return count;
+ }
+ })).when(ledgerMock).getNumberOfEntries(any());
+ assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1,
0)), PositionFactory.create(1, 1));
+ assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1,
19)), PositionFactory.create(2, 0));
+ assertEquals(ledgerMock.getPreviousPosition(PositionFactory.create(2,
0)), PositionFactory.create(1, 19));
+ assertThrows(NullPointerException.class, () ->
ledgerMock.getPreviousPosition(PositionFactory.create(0, 0)));
+ assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+ PositionFactory.create(1, 0), PositionFactory.create(1, 0))),
0);
+ assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+ PositionFactory.create(1, -1), PositionFactory.create(1, 9))),
10);
+ assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+ PositionFactory.create(1, 19), PositionFactory.create(2,
-1))), 0);
+ assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+ PositionFactory.create(1, 19), PositionFactory.create(2, 9))),
10);
+ assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed(
+ PositionFactory.create(1, -1), PositionFactory.create(3,
19))), 60);
+
+ // Add a consumer
+ final Consumer consumer1 = mock(Consumer.class);
+ doReturn("consumer1").when(consumer1).consumerName();
+ when(consumer1.getAvailablePermits()).thenReturn(1000);
+ doReturn(true).when(consumer1).isWritable();
+ doReturn(channelMock).when(consumer1).sendMessages(anyList(),
any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class), anyInt(), anyLong(),
anyLong(), any(RedeliveryTracker.class));
+ persistentDispatcher.addConsumer(consumer1);
+
+ /*
+ On single ledger
+ */
+
+ // Expected individuallySentPositions (isp): [(1:-1, 1:8]] (init) ->
[(1:-1, 1:9]] (update) -> [] (remove)
+ // Expected lastSentPosition (lsp): 1:10 (init) -> 1:10 (remove)
+ // upper bound and the new entry are less than initial last sent
position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, -1, 1, 8);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 9, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:-1, 1:9]] -> [(1:-1, 1:10]] -> []
+ // lsp: 1:10 -> 1:10
+ // upper bound is less than initial last sent position
+ // upper bound and the new entry are less than or equal to initial
last sent position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, -1, 1, 9);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 10, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:-1, 1:2], (1:3, 1:4], (1:5, 1:6]] -> [(1:-1, 1:2], (1:3,
1:4], (1:5, 1:6], (1:9, 1:10]] -> []
+ // lsp: 1:10 -> 1:10
+ // upper bound and the new entry are less than or equal to initial
last sent position
+ // individually sent positions has multiple ranges
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, -1, 1, 2);
+ individuallySentPositions.addOpenClosed(1, 3, 1, 4);
+ individuallySentPositions.addOpenClosed(1, 5, 1, 6);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 10, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:-1, 1:10]] -> [(1:-1, 1:11]] -> []
+ // lsp: 1:10 -> 1:11
+ // upper bound is less than or equal to initial last sent position
+ // the new entry is next position of initial last sent position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, -1, 1, 10);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 11, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(1, 11).toString());
+
+ // isp: [(1:-1, 1:9]] -> [(1:-1, 1:9], (1:10, 1:11]] -> []
+ // lsp: 1:10 -> 1:11
+ // upper bound is less than initial last sent position
+ // the new entry is next position of initial last sent position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, -1, 1, 9);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 11, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(1, 11).toString());
+
+ // isp: [(1:11, 1:15]] -> [(1:10, 1:15]] -> []
+ // lsp: 1:10 -> 1:15
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry is next position of initial last sent position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 11, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(1, 15).toString());
+
+ // isp: [(1:11, 1:15]] -> [(1:10, 1:16]] -> []
+ // lsp: 1:10 -> 1:16
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entries contain next position of initial last sent position
+ // first of the new entries is less than initial last sent position
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 9, createMessage("test", 1)),
+ EntryImpl.create(1, 11, createMessage("test", 2)),
+ EntryImpl.create(1, 16, createMessage("test", 3))),
true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(1, 16).toString());
+
+ // isp: [(1:11, 1:15]] -> [(1:11, 1:15]] -> [(1:11, 1:15]]
+ // lsp: 1:10 -> 1:10
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry isn't next position of initial last sent position
+ // the range contains the new entry
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+ expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 15, createMessage("test",
1))), true);
+ assertEquals(individuallySentPositions.toString(),
expectedIndividuallySentPositions.toString());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:11, 1:15]] -> [(1:11, 1:16]] -> [(1:11, 1:16]]
+ // lsp: 1:10 -> 1:10
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry isn't next position of initial last sent position
+ // the range doesn't contain the new entry
+ // the new entry is next position of upper bound
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+ expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 16);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 16, createMessage("test",
1))), true);
+ assertEquals(individuallySentPositions.toString(),
expectedIndividuallySentPositions.toString());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:11, 1:15]] -> [(1:11, 1:15], (1:16, 1:17]] -> [(1:11,
1:15], (1:16, 1:17]]
+ // lsp: 1:10 -> 1:10
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry isn't next position of initial last sent position
+ // the range doesn't contain the new entry
+ // the new entry isn't next position of upper bound
+ // the new entry is same ledger
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 15);
+ expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15);
+ expectedIndividuallySentPositions.addOpenClosed(1, 16, 1, 17);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 17, createMessage("test",
1))), true);
+ assertEquals(individuallySentPositions.toString(),
expectedIndividuallySentPositions.toString());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ /*
+ On multiple contiguous ledgers
+ */
+
+ // isp: [(1:11, 1:18]] -> [(1:11, 1:18], (2:-1, 2:0]] -> [(1:11,
1:18], (2:-1, 2:0]]
+ // lsp: 1:10 -> 1:10
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry isn't next position of initial last sent position
+ // the range doesn't contain the new entry
+ // the new entry isn't next position of upper bound
+ // the new entry isn't same ledger
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 18);
+ expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 18);
+ expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 0);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(2, 0, createMessage("test",
1))), true);
+ assertEquals(individuallySentPositions.toString(),
expectedIndividuallySentPositions.toString());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+
+ // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:10, 1:19], (2:-1, 2:0]] ->
[]
+ // lsp: 1:10 -> 2:0
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry is next position of initial last sent position
+ // the new entry isn't same ledger
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+ individuallySentPositions.addOpenClosed(2, -1, 2, 0);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 11, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(2, 0).toString());
+
+ // isp: [(1:11, 1:19], (2:-1, 2:19], (3:-1, 3:0]] -> [(1:10, 1:19],
(2:-1, 2:19], (3:-1, 3:0]] -> []
+ // lsp: 1:10 -> 3:0
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry is next position of initial last sent position
+ // the new entry isn't same ledger
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+ individuallySentPositions.addOpenClosed(2, -1, 2, 19);
+ individuallySentPositions.addOpenClosed(3, -1, 3, 0);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(1, 11, createMessage("test",
1))), true);
+ assertTrue(individuallySentPositions.isEmpty());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
PositionFactory.create(3, 0).toString());
+
+ // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:11, 1:19], (2:-1, 2:1]] ->
[(1:11, 1:19], (2:-1, 2:1]]
+ // lsp: 1:10 -> 1:10
+ // upper bound is greater than initial last sent position
+ // the range doesn't contain next position of initial last sent
position
+ // the new entry isn't next position of initial last sent position
+ // the new entry isn't same ledger
+ assertNull(clearPosition.get());
+ individuallySentPositions.addOpenClosed(1, 11, 1, 19);
+ individuallySentPositions.addOpenClosed(2, -1, 2, 0);
+ expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 19);
+ expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 1);
+
persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal,
+ Arrays.asList(EntryImpl.create(2, 1, createMessage("test",
1))), true);
+ assertEquals(individuallySentPositions.toString(),
expectedIndividuallySentPositions.toString());
+ assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 5b2998216e8..14403765105 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -233,7 +233,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
"unackedMessages",
"avgMessagesPerEntry",
"blockedConsumerOnUnackedMsgs",
- "readPositionWhenJoining",
+ "lastSentPositionWhenJoining",
"lastAckedTime",
"lastAckedTimestamp",
"lastConsumedTime",
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 92c51da64d3..e8fd5378316 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.client.api;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -26,6 +29,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -33,6 +37,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -49,6 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
@@ -56,17 +62,24 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
+import
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -1096,13 +1109,21 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
final String topicName =
"persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID();
final String subName = "my-sub";
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true))
.subscribe();
+ @Cleanup
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ producer.send("message".getBytes());
+ Awaitility.await().untilAsserted(() ->
assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS)));
+
CompletableFuture<Optional<Topic>> future =
pulsar.getBrokerService().getTopicIfExists(topicName);
assertTrue(future.isDone());
assertTrue(future.get().isPresent());
@@ -1110,14 +1131,18 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers)
topic.getSubscription(subName).getDispatcher();
assertTrue(dispatcher.isAllowOutOfOrderDelivery());
- consumer.close();
+ assertNull(dispatcher.getLastSentPositionField());
+ assertNull(dispatcher.getIndividuallySentPositionsField());
+ consumer1.close();
- consumer = pulsarClient.newConsumer()
+ final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
.subscribe();
+ producer.send("message".getBytes());
+ Awaitility.await().untilAsserted(() ->
assertNotNull(consumer2.receive(100, TimeUnit.MILLISECONDS)));
future = pulsar.getBrokerService().getTopicIfExists(topicName);
assertTrue(future.isDone());
@@ -1125,7 +1150,9 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
topic = future.get().get();
dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers)
topic.getSubscription(subName).getDispatcher();
assertFalse(dispatcher.isAllowOutOfOrderDelivery());
- consumer.close();
+ assertNotNull(dispatcher.getLastSentPositionField());
+ assertNotNull(dispatcher.getIndividuallySentPositionsField());
+ consumer2.close();
}
@Test(timeOut = 30_000)
@@ -1199,6 +1226,370 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
l.await();
}
+ @DataProvider(name = "preSend")
+ private Object[][] preSendProvider() {
+ return new Object[][] { { false }, { true } };
+ }
+
+ @Test(timeOut = 30_000, dataProvider = "preSend")
+ public void testCheckBetweenSkippingAndRecentlyJoinedConsumers(boolean
preSend) throws Exception {
+ conf.setSubscriptionKeySharedUseConsistentHashing(true);
+
+ final String topicName =
"persistent://public/default/recently-joined-consumers-" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ @Cleanup
+ final Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ if (preSend) {
+ // verify that the test succeeds even if the topic has a message
+ p.send("msg");
+ }
+
+ final Supplier<ConsumerBuilder<String>> cb = () ->
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+ .setAllowOutOfOrderDelivery(false));
+
+ // create 2 consumers
+ final String c1ConsumerName = "c1";
+ @Cleanup
+ final Consumer<String> c1 =
cb.get().consumerName(c1ConsumerName).receiverQueueSize(1).subscribe();
+ @Cleanup
+ final Consumer<String> c2 =
cb.get().consumerName("c2").receiverQueueSize(1000).subscribe();
+
+ final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ final Field recentlyJoinedConsumersField =
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("recentlyJoinedConsumers");
+ recentlyJoinedConsumersField.setAccessible(true);
+ final LinkedHashMap<org.apache.pulsar.broker.service.Consumer,
Position> recentlyJoinedConsumers =
(LinkedHashMap<org.apache.pulsar.broker.service.Consumer, Position>)
recentlyJoinedConsumersField.get(dispatcher);
+ final String keyA = "key-a";
+ final int hashA =
Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
+ final Map<Integer, String> hashConsumerMap = new HashMap<>();
+ hashConsumerMap.put(hashA, c1.getConsumerName());
+
+ // enforce the selector will return c1 if keyA
+ final Field selectorField =
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+ selectorField.setAccessible(true);
+ final StickyKeyConsumerSelector selector =
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+ selectorField.set(dispatcher, selector);
+ doAnswer((invocationOnMock -> {
+ final int hash = invocationOnMock.getArgument(0);
+ final String consumerName = hashConsumerMap.getOrDefault(hash,
c2.getConsumerName());
+ return dispatcher.getConsumers().stream().filter(consumer ->
consumer.consumerName().equals(consumerName)).findFirst().get();
+ })).when(selector).select(anyInt());
+
+ // send and receive
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(),
1));
+ final MessageIdImpl msg0Id = (MessageIdImpl)
p.newMessage().key(keyA).value("msg-0").send();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(),
0));
+
+ final MessageIdImpl msg1Id = (MessageIdImpl)
p.newMessage().key(keyA).value("msg-1").send();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(),
2));
+
+ final Field redeliveryMessagesField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("redeliveryMessages");
+ redeliveryMessagesField.setAccessible(true);
+ final MessageRedeliveryController redeliveryMessages =
(MessageRedeliveryController) redeliveryMessagesField.get(dispatcher);
+
+ final Set<Position> replayMsgSet =
redeliveryMessages.getMessagesToReplayNow(3);
+ assertEquals(replayMsgSet.size(), 1);
+ final Position replayMsg = replayMsgSet.stream().findAny().get();
+ assertEquals(replayMsg, PositionFactory.create(msg1Id.getLedgerId(),
msg1Id.getEntryId()));
+
+ // add c3
+ final String c3ConsumerName = "c3";
+ hashConsumerMap.put(hashA, c3ConsumerName);
+ @Cleanup
+ final Consumer<String> c3 =
cb.get().consumerName(c3ConsumerName).subscribe();
+ final List<Message<String>> c3Msgs = new ArrayList<>();
+ final org.apache.pulsar.broker.service.Consumer c3Broker =
dispatcher.getConsumers().stream().filter(consumer ->
consumer.consumerName().equals(c3ConsumerName)).findFirst().get();
+ assertEquals(recentlyJoinedConsumers.get(c3Broker),
PositionFactory.create(msg0Id.getLedgerId(), msg0Id.getEntryId()));
+
+ // None of messages are sent to c3.
+ Message<String> c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+ assertNull(c3Msg);
+
+ // Disconnect c1
+ c1.close();
+
+ c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+ assertNotNull(c3Msg);
+ c3Msgs.add(c3Msg);
+ // The mark delete position will move forward. Then remove c3 from
recentlyJoinedConsumers.
+ c3.acknowledge(c3Msg);
+ Awaitility.await().untilAsserted(() ->
assertNull(recentlyJoinedConsumers.get(c3Broker)));
+ c3Msg = c3.receive(100, TimeUnit.MILLISECONDS);
+ assertNotNull(c3Msg);
+ c3Msgs.add(c3Msg);
+ c3.acknowledge(c3Msg);
+
+ // check ordering
+
assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId())
< 0);
+ }
+
+ @Test(timeOut = 30_000)
+ public void testLastSentPositionWhenRecreatingDispatcher() throws
Exception {
+ // The lastSentPosition and individuallySentPositions should be
initialized
+ // by the markDeletedPosition and individuallyDeletedMessages.
+ final String topicName = "persistent://public/default/rewind-" +
UUID.randomUUID();
+ final String subName = "my-sub";
+
+ final int numMessages = 9;
+ final List<String> keys = Arrays.asList("key-a", "key-b", "key-c");
+ final AtomicInteger receiveCounter = new AtomicInteger();
+ final AtomicInteger ackCounter = new AtomicInteger();
+
+ @Cleanup
+ final Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ final Supplier<ConsumerBuilder<Integer>> cb = () ->
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+ .setAllowOutOfOrderDelivery(false));
+
+ @Cleanup
+ final Consumer<Integer> c1 = cb.get().messageListener((c, msg) -> {
+ if (keys.get(0).equals(msg.getKey())) {
+ try {
+ c.acknowledge(msg);
+ ackCounter.getAndIncrement();
+ } catch (PulsarClientException e) {
+ fail(e.getMessage());
+ }
+ }
+ receiveCounter.getAndIncrement();
+ }).subscribe();
+
+ PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ LongPairRangeSet<Position> individuallySentPositionsField =
dispatcher.getIndividuallySentPositionsField();
+ final ManagedCursorImpl cursor = (ManagedCursorImpl)
((PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor();
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl)
cursor.getManagedLedger();
+
+ MessageIdImpl msgId = null;
+ for (int i = 0; i < numMessages; i++) {
+ msgId = (MessageIdImpl) producer.newMessage().key(keys.get(i %
keys.size())).value(i).send();
+ }
+
+ // wait for consumption
+ Awaitility.await().untilAsserted(() ->
assertEquals(receiveCounter.get(), numMessages));
+ assertEquals(ackCounter.get(), numMessages / keys.size());
+ assertEquals(dispatcher.getLastSentPositionField(),
PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()));
+ assertTrue(individuallySentPositionsField.isEmpty());
+ receiveCounter.set(0);
+ ackCounter.set(0);
+
+ // create expected values
+ final Position expectedLastSentPosition =
ledger.getNextValidPosition(cursor.getMarkDeletedPosition());
+ final ConcurrentOpenLongPairRangeSet<Position>
+ expectedIndividuallySentPositions = new
ConcurrentOpenLongPairRangeSet<>(4096, PositionFactory::create);
+ cursor.getIndividuallyDeletedMessagesSet().forEach(range -> {
+ final Position lower = range.lowerEndpoint();
+ final Position upper = range.upperEndpoint();
+
expectedIndividuallySentPositions.addOpenClosed(lower.getLedgerId(),
lower.getEntryId(), upper.getLedgerId(), upper.getEntryId());
+ return true;
+ });
+
+ // modify subscription type to close current dispatcher
+ admin.topics().createSubscription(topicName, "sub-alt",
MessageId.earliest);
+ c1.close();
+ @Cleanup
+ final Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ c2.close();
+
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getType(),
SubscriptionType.Exclusive.toString());
+
+ @Cleanup
+ final Consumer<Integer> c3 = cb.get().receiverQueueSize(0).subscribe();
+ dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ individuallySentPositionsField =
dispatcher.getIndividuallySentPositionsField();
+
+ assertNull(dispatcher.getLastSentPositionField());
+ assertTrue(individuallySentPositionsField.isEmpty());
+
+ assertNotNull(c3.receive());
+
+ // validate the individuallySentPosition is initialized by the
individuallyDeletedMessages
+ // if it is not initialized expectedly, it has sent-hole of key-c
messages because key-c messages are not scheduled to be dispatched to some
consumer(already acked).
+ assertEquals(dispatcher.getLastSentPositionField(),
expectedLastSentPosition);
+ assertEquals(individuallySentPositionsField.toString(),
expectedIndividuallySentPositions.toString());
+ }
+
+ @Test(timeOut = 30_000)
+ public void testLastSentPositionWhenResettingCursor() throws Exception {
+ // The lastSentPosition and individuallySentPositions should be
cleared if reset-cursor operation is executed.
+ final String nsName = "public/default";
+ final String topicName = "persistent://" + nsName + "/reset-cursor-" +
UUID.randomUUID();
+ final String subName = "my-sub";
+
+ final int numMessages = 10;
+ final List<String> keys = Arrays.asList("key-a", "key-b");
+ final AtomicInteger ackCounter = new AtomicInteger();
+
+ @Cleanup
+ final Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ final Supplier<ConsumerBuilder<Integer>> cb = () ->
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .receiverQueueSize(0)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+ .setAllowOutOfOrderDelivery(false));
+
+ @Cleanup
+ final Consumer<Integer> c1 = cb.get().consumerName("c1").subscribe();
+ @Cleanup
+ final Consumer<Integer> c2 = cb.get().consumerName("c2").subscribe();
+
+ // set retention policy
+ admin.namespaces().setRetention(nsName, new RetentionPolicies(1, 1024
* 1024));
+
+ // enforce the selector will return c1 if keys.get(0)
+ final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ final int hashA =
Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes());
+ final Map<Integer, String> hashConsumerMap = new HashMap<>();
+ hashConsumerMap.put(hashA, c1.getConsumerName());
+ final Field selectorField =
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+ selectorField.setAccessible(true);
+ final StickyKeyConsumerSelector selector =
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+ selectorField.set(dispatcher, selector);
+ doAnswer((invocationOnMock -> {
+ final int hash = invocationOnMock.getArgument(0);
+ final String consumerName = hashConsumerMap.getOrDefault(hash,
c2.getConsumerName());
+ return dispatcher.getConsumers().stream().filter(consumer ->
consumer.consumerName().equals(consumerName)).findFirst().get();
+ })).when(selector).select(anyInt());
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().key(keys.get(i %
keys.size())).value(i).send();
+ }
+
+ // consume some messages
+ for (int i = 0; i < numMessages / keys.size(); i++) {
+ final Message<Integer> msg = c2.receive();
+ if (msg != null) {
+ c2.acknowledge(msg);
+ ackCounter.getAndIncrement();
+ }
+ }
+ assertEquals(ackCounter.get(), numMessages / keys.size());
+
+ // store current lastSentPosition for comparison
+ final LongPairRangeSet<Position> individuallySentPositionsField =
dispatcher.getIndividuallySentPositionsField();
+ assertNotNull(dispatcher.getLastSentPositionField());
+ assertFalse(individuallySentPositionsField.isEmpty());
+
+ // reset cursor and receive a message
+ admin.topics().resetCursor(topicName, subName, MessageId.earliest,
true);
+
+ // validate the lastSentPosition and individuallySentPositions are
cleared after resetting cursor
+ assertNull(dispatcher.getLastSentPositionField());
+ assertTrue(individuallySentPositionsField.isEmpty());
+ }
+
+ @Test(timeOut = 30_000)
+ public void testLastSentPositionWhenSkipping() throws Exception {
+ // The lastSentPosition and individuallySentPositions should be
updated if skip operation is executed.
+ // There are updated to follow the new markDeletedPosition.
+ final String topicName = "persistent://public/default/skip-" +
UUID.randomUUID();
+ final String subName = "my-sub";
+
+ final int numMessages = 10;
+ final List<String> keys = Arrays.asList("key-a", "key-b");
+ final int numSkip = 2;
+ final AtomicInteger ackCounter = new AtomicInteger();
+
+ @Cleanup
+ final Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ final Supplier<ConsumerBuilder<Integer>> cb = () ->
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()
+ .setAllowOutOfOrderDelivery(false))
+ .receiverQueueSize(0);
+
+ @Cleanup
+ final Consumer<Integer> c1 = cb.get().consumerName("c1").subscribe();
+ @Cleanup
+ final Consumer<Integer> c2 = cb.get().consumerName("c2").subscribe();
+
+ // enforce the selector will return c1 if keys.get(0)
+ final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ final int hashA =
Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes());
+ final Map<Integer, String> hashConsumerMap = new HashMap<>();
+ hashConsumerMap.put(hashA, c1.getConsumerName());
+ final Field selectorField =
PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
+ selectorField.setAccessible(true);
+ final StickyKeyConsumerSelector selector =
spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
+ selectorField.set(dispatcher, selector);
+ doAnswer((invocationOnMock -> {
+ final int hash = invocationOnMock.getArgument(0);
+ final String consumerName = hashConsumerMap.getOrDefault(hash,
c2.getConsumerName());
+ return dispatcher.getConsumers().stream().filter(consumer ->
consumer.consumerName().equals(consumerName)).findFirst().get();
+ })).when(selector).select(anyInt());
+
+ final List<Position> positionList = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ final MessageIdImpl msgId = (MessageIdImpl)
producer.newMessage().key(keys.get(i % keys.size())).value(i).send();
+ positionList.add(PositionFactory.create(msgId.getLedgerId(),
msgId.getEntryId()));
+ }
+
+ // consume some messages
+ for (int i = 0; i < numSkip; i++) {
+ final Message<Integer> msg = c2.receive();
+ if (msg != null) {
+ c2.acknowledge(msg);
+ ackCounter.getAndIncrement();
+ }
+ }
+ assertEquals(ackCounter.get(), numSkip);
+ final ManagedCursorImpl managedCursor = ((ManagedCursorImpl)
((PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor());
+ Awaitility.await().untilAsserted(() ->
assertEquals(managedCursor.getIndividuallyDeletedMessagesSet().size(), 2));
+
+ // store current lastSentPosition for comparison
+ final Position lastSentPositionBeforeSkip =
dispatcher.getLastSentPositionField();
+ final LongPairRangeSet<Position> individuallySentPositionsField =
dispatcher.getIndividuallySentPositionsField();
+ assertNotNull(lastSentPositionBeforeSkip);
+ assertFalse(individuallySentPositionsField.isEmpty());
+
+ // skip messages and receive a message
+ admin.topics().skipMessages(topicName, subName, numSkip);
+ final MessageIdImpl msgIdAfterSkip = (MessageIdImpl)
c1.receive().getMessageId();
+ final Position positionAfterSkip =
PositionFactory.create(msgIdAfterSkip.getLedgerId(),
+ msgIdAfterSkip.getEntryId());
+ assertEquals(positionAfterSkip, positionList.get(4));
+
+ // validate the lastSentPosition is updated to the new
markDeletedPosition
+ // validate the individuallySentPositions is updated expectedly
(removeAtMost the new markDeletedPosition)
+ final Position lastSentPosition =
dispatcher.getLastSentPositionField();
+ assertNotNull(lastSentPosition);
+ assertTrue(lastSentPosition.compareTo(lastSentPositionBeforeSkip) > 0);
+ assertEquals(lastSentPosition, positionList.get(4));
+ assertTrue(individuallySentPositionsField.isEmpty());
+ }
private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String
subscription) {
if
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index d2d3600df96..5f2cf7b209e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -72,8 +72,8 @@ public interface ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of
unacked messages. */
boolean isBlockedConsumerOnUnackedMsgs();
- /** The read position of the cursor when the consumer joining. */
- String getReadPositionWhenJoining();
+ /** The last sent position of the cursor when the consumer joining. */
+ String getLastSentPositionWhenJoining();
/** Address of this consumer. */
String getAddress();
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index d4850adaa6f..cabef1ca960 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -118,6 +118,12 @@ public interface SubscriptionStats {
/** This is for Key_Shared subscription to get the recentJoinedConsumers
in the Key_Shared subscription. */
Map<String, String> getConsumersAfterMarkDeletePosition();
+ /** The last sent position of the cursor. This is for Key_Shared
subscription. */
+ String getLastSentPosition();
+
+ /** Set of individually sent ranges. This is for Key_Shared subscription.
*/
+ String getIndividuallySentPositions();
+
/** SubscriptionProperties (key/value strings) associated with this
subscribe. */
Map<String, String> getSubscriptionProperties();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index de36b330b7f..b4c5d21e692 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -77,8 +77,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of
unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;
- /** The read position of the cursor when the consumer joining. */
- public String readPositionWhenJoining;
+ /** The last sent position of the cursor when the consumer joining. */
+ public String lastSentPositionWhenJoining;
/** Address of this consumer. */
private String address;
@@ -113,7 +113,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
- this.readPositionWhenJoining = stats.readPositionWhenJoining;
+ this.lastSentPositionWhenJoining = stats.lastSentPositionWhenJoining;
return this;
}
@@ -141,8 +141,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
this.clientVersion = clientVersion;
}
- public String getReadPositionWhenJoining() {
- return readPositionWhenJoining;
+ public String getLastSentPositionWhenJoining() {
+ return lastSentPositionWhenJoining;
}
public String getLastAckedTime() {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a8ea0060629..ab4d07c7ae4 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -126,6 +126,12 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** This is for Key_Shared subscription to get the recentJoinedConsumers
in the Key_Shared subscription. */
public Map<String, String> consumersAfterMarkDeletePosition;
+ /** The last sent position of the cursor. This is for Key_Shared
subscription. */
+ public String lastSentPosition;
+
+ /** Set of individually sent ranges. This is for Key_Shared subscription.
*/
+ public String individuallySentPositions;
+
/** The number of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRanges;