This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0d383ba [broker] Fix issue that message ordering could be broken when
redelivering messages on Key_Shared subscription (#10762)
0d383ba is described below
commit 0d383ba91636554c67bf0fc5edef8eb22afad77f
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Jul 15 20:44:54 2021 +0900
[broker] Fix issue that message ordering could be broken when redelivering
messages on Key_Shared subscription (#10762)
Messages with the same key can be out of order if message redelivery occurs
on a Key_Shared subscription.
1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver`
contains message-1 and message-2. Message-1 will be delivered to consumer-a and
message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer
was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()`
will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in
`messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.
When adding a message to be redeliver to `messagesToRedeliver`, save the
hash of the key that the message has. If the dispatcher attempts to send newer
messages to the consumer that have a key corresponding to any one of the saved
hash values, they will be added to `messagesToRedeliver` instead of being sent.
This prevents messages with the same key from being out of order.
(cherry picked from commit 5aee5998f6b9e500873e1290f85a21eb11d1cd63)
---
...ConsistentHashingStickyKeyConsumerSelector.java | 4 +-
.../org/apache/pulsar/broker/service/Consumer.java | 18 +-
...ashRangeAutoSplitStickyKeyConsumerSelector.java | 4 +-
...ashRangeExclusiveStickyKeyConsumerSelector.java | 9 +-
.../broker/service/StickyKeyConsumerSelector.java | 17 +-
.../persistent/MessageRedeliveryController.java | 120 ++++++++++++
.../PersistentDispatcherMultipleConsumers.java | 98 +++++-----
...istentStickyKeyDispatcherMultipleConsumers.java | 29 ++-
.../broker/service/PersistentTopicE2ETest.java | 21 +-
.../MessageRedeliveryControllerTest.java | 213 +++++++++++++++++++++
...ntStickyKeyDispatcherMultipleConsumersTest.java | 152 ++++++++++++++-
.../apache/pulsar/common/protocol/Commands.java | 18 ++
12 files changed, 614 insertions(+), 89 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index 96029d8..e280048 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -105,9 +105,7 @@ public class ConsistentHashingStickyKeyConsumerSelector
implements StickyKeyCons
}
@Override
- public Consumer select(byte[] stickyKey) {
- int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
-
+ public Consumer select(int hash) {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 475fdc9..b6557f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -54,6 +54,7 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -236,7 +237,8 @@ public class Consumer {
Entry entry = entries.get(i);
if (entry != null) {
int batchSize = batchSizes.getBatchSize(i);
- pendingAcks.put(entry.getLedgerId(), entry.getEntryId(),
batchSize, 0);
+ int stickyKeyHash = getStickyKeyHash(entry);
+ pendingAcks.put(entry.getLedgerId(), entry.getEntryId(),
batchSize, stickyKeyHash);
if (log.isDebugEnabled()){
log.debug("[{}-{}] Added {}:{} ledger entry with
batchSize of {} to pendingAcks in"
+ " broker.service.Consumer for
consumerId: {}",
@@ -729,7 +731,7 @@ public class Consumer {
if (pendingAcks != null) {
List<PositionImpl> pendingPositions = new ArrayList<>((int)
pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
- pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
+ pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash)
-> {
totalRedeliveryMessages.add((int) batchSize);
pendingPositions.add(new PositionImpl(ledgerId, entryId));
});
@@ -752,10 +754,11 @@ public class Consumer {
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
PositionImpl position = PositionImpl.get(msg.getLedgerId(),
msg.getEntryId());
- LongPair batchSize = pendingAcks.get(position.getLedgerId(),
position.getEntryId());
- if (batchSize != null) {
+ LongPair longPair = pendingAcks.get(position.getLedgerId(),
position.getEntryId());
+ if (longPair != null) {
+ long batchSize = longPair.first;
pendingAcks.remove(position.getLedgerId(),
position.getEntryId());
- totalRedeliveryMessages += batchSize.first;
+ totalRedeliveryMessages += batchSize;
pendingPositions.add(position);
}
}
@@ -810,5 +813,10 @@ public class Consumer {
return cnx;
}
+ private int getStickyKeyHash(Entry entry) {
+ byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(),
topicName, subscription.getName());
+ return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+ }
+
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index f74165e..b715c51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
import java.util.ArrayList;
import java.util.Collections;
@@ -104,8 +103,7 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector
implements StickyKeyCon
}
@Override
- public Consumer select(byte[] stickyKey) {
- int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
+ public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
return rangeMap.ceilingEntry(slot).getValue();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 75cb6b5..d8aea2e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
import java.util.ArrayList;
import java.util.Collections;
@@ -66,11 +65,6 @@ public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyCon
}
@Override
- public Consumer select(byte[] stickyKey) {
- return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
- }
-
- @Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
Map.Entry<Integer, Consumer> prev = null;
@@ -88,7 +82,8 @@ public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyCon
return result;
}
- Consumer select(int hash) {
+ @Override
+ public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry =
rangeMap.ceilingEntry(slot);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 0d686f7..5ea6f7f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -22,6 +22,7 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignExc
import java.util.List;
import java.util.Map;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
public interface StickyKeyConsumerSelector {
@@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector {
* @param stickyKey sticky key
* @return consumer
*/
- Consumer select(byte[] stickyKey);
+ default Consumer select(byte[] stickyKey) {
+ return select(makeStickyKeyHash(stickyKey));
+ }
+
+ static int makeStickyKeyHash(byte[] stickyKey) {
+ return Murmur3_32Hash.getInstance().makeHash(stickyKey);
+ }
+
+ /**
+ * Select a consumer by hash.
+ *
+ * @param hash hash corresponding to sticky key
+ * @return consumer
+ */
+ Consumer select(int hash);
/**
* Get key hash ranges handled by each consumer
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
new file mode 100644
index 0000000..be14356
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+ private final LongPairSet messagesToRedeliver;
+ private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+ public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+ this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+ this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new
ConcurrentLongLongPairHashMap(128, 2);
+ }
+
+ public boolean add(long ledgerId, long entryId) {
+ return messagesToRedeliver.add(ledgerId, entryId);
+ }
+
+ public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+ if (hashesToBeBlocked != null) {
+ hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+ }
+ return messagesToRedeliver.add(ledgerId, entryId);
+ }
+
+ public boolean remove(long ledgerId, long entryId) {
+ if (hashesToBeBlocked != null) {
+ hashesToBeBlocked.remove(ledgerId, entryId);
+ }
+ return messagesToRedeliver.remove(ledgerId, entryId);
+ }
+
+ public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+ if (hashesToBeBlocked != null) {
+ List<LongPair> keysToRemove = new ArrayList<>();
+ hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none)
-> {
+ if (ComparisonChain.start().compare(ledgerId,
markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+ .result() <= 0) {
+ keysToRemove.add(new LongPair(ledgerId, entryId));
+ }
+ });
+ keysToRemove.forEach(longPair ->
hashesToBeBlocked.remove(longPair.first, longPair.second));
+ keysToRemove.clear();
+ }
+ return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
+ return ComparisonChain.start().compare(ledgerId,
markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+ .result() <= 0;
+ });
+ }
+
+ public boolean isEmpty() {
+ return messagesToRedeliver.isEmpty();
+ }
+
+ public void clear() {
+ if (hashesToBeBlocked != null) {
+ hashesToBeBlocked.clear();
+ }
+ messagesToRedeliver.clear();
+ }
+
+ public String toString() {
+ return messagesToRedeliver.toString();
+ }
+
+ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
+ final AtomicBoolean isContained = new AtomicBoolean(false);
+ if (hashesToBeBlocked != null) {
+ hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none)
-> {
+ if (!isContained.get() && stickyKeyHashes.contains((int)
stickyKeyHash)) {
+ isContained.set(true);
+ }
+ });
+ }
+ return isContained.get();
+ }
+
+ public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ if (hashesToBeBlocked != null) {
+ // allowOutOfOrderDelivery is false
+ return messagesToRedeliver.items().stream()
+ .sorted((l1, l2) ->
ComparisonChain.start().compare(l1.first, l2.first)
+ .compare(l1.second, l2.second).result())
+ .limit(maxMessagesToRead).map(longPair -> new
PositionImpl(longPair.first, longPair.second))
+ .collect(Collectors.toCollection(TreeSet::new));
+ } else {
+ // allowOutOfOrderDelivery is true
+ return messagesToRedeliver.items(maxMessagesToRead,
+ (ledgerId, entryId) -> new PositionImpl(ledgerId,
entryId));
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6b2b4ec..adc5b2b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
-
-import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
-
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -57,19 +54,17 @@ import
org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
-import org.apache.pulsar.common.util.collections.LongPairSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +77,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile Range<PositionImpl>
lastIndividualDeletedRangeFromCursorRecovery;
private CompletableFuture<Void> closeFuture = null;
- LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+ protected final MessageRedeliveryController redeliveryMessages;
protected final RedeliveryTracker redeliveryTracker;
private Optional<DelayedDeliveryTracker> delayedDeliveryTracker =
Optional.empty();
@@ -109,12 +104,19 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
Normal, Replay
}
- public PersistentDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor, Subscription subscription) {
+ public PersistentDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
+ Subscription subscription) {
+ this(topic, cursor, subscription, true);
+ }
+
+ public PersistentDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor, Subscription subscription,
+ boolean allowOutOfOrderDelivery) {
super(subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery =
cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
+ this.redeliveryMessages = new
MessageRedeliveryController(allowOutOfOrderDelivery);
this.redeliveryTracker =
this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
@@ -137,7 +139,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
cursor.rewind();
shouldRewindBeforeReadingOrReplaying = false;
}
- messagesToRedeliver.clear();
+ redeliveryMessages.clear();
}
if (isConsumersExceededOnSubscription()) {
@@ -195,7 +197,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
havePendingRead = false;
}
- messagesToRedeliver.clear();
+ redeliveryMessages.clear();
redeliveryTracker.clear();
if (closeFuture != null) {
log.info("[{}] All consumers removed. Subscription is
disconnected", name);
@@ -206,8 +208,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer are left, reading more entries",
name);
}
- consumer.getPendingAcks().forEach((ledgerId, entryId,
batchSize, none) -> {
- if (addMessageToReplay(ledgerId, entryId)) {
+ consumer.getPendingAcks().forEach((ledgerId, entryId,
batchSize, stickyKeyHash) -> {
+ if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
}
});
@@ -333,9 +335,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
- deletedMessages.forEach(position ->
messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
+ deletedMessages.forEach(position ->
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
- // if all the entries are acked-entries and cleared up from
messagesToRedeliver, try to read
+ // if all the entries are acked-entries and cleared up from
redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never
called
if ((messagesToReplayNow.size() - deletedMessages.size()) ==
0) {
havePendingReplayRead = false;
@@ -516,7 +518,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// remove positions first from replay list first :
sendMessages recycles entries
if (readType == ReadType.Replay) {
entries.subList(start, start + messagesForC).forEach(entry
-> {
- messagesToRedeliver.remove(entry.getLedgerId(),
entry.getEntryId());
+ redeliveryMessages.remove(entry.getLedgerId(),
entry.getEntryId());
});
}
@@ -563,7 +565,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
entries.size() - start);
}
entries.subList(start, entries.size()).forEach(entry -> {
- addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
entry.release();
});
}
@@ -609,11 +612,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
havePendingReplayRead = false;
if (exception instanceof
ManagedLedgerException.InvalidReplayPositionException) {
PositionImpl markDeletePosition = (PositionImpl)
cursor.getMarkDeletedPosition();
-
- messagesToRedeliver.removeIf((ledgerId, entryId) -> {
- return ComparisonChain.start().compare(ledgerId,
markDeletePosition.getLedgerId())
- .compare(entryId,
markDeletePosition.getEntryId()).result() <= 0;
- });
+
redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
}
}
@@ -679,12 +678,12 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer) {
- consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none)
-> {
- addMessageToReplay(ledgerId, entryId);
+ consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
+ addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for
consumer {}", name, consumer,
- messagesToRedeliver);
+ redeliveryMessages);
}
readMoreEntries();
}
@@ -692,6 +691,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer, List<PositionImpl> positions) {
positions.forEach(position -> {
+ // TODO: We want to pass a sticky key hash as a third argument to
guarantee the order of the messages
+ // on Key_Shared subscription, but it's difficult to get the
sticky key here
if (addMessageToReplay(position.getLedgerId(),
position.getEntryId())) {
redeliveryTracker.addIfAbsent(position);
}
@@ -793,29 +794,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
}
- @Override
- public boolean trackDelayedDelivery(long ledgerId, long entryId,
MessageMetadata msgMetadata) {
- if (!topic.isDelayedDeliveryEnabled()) {
- // If broker has the feature disabled, always deliver messages
immediately
- return false;
- }
-
- synchronized (this) {
- if (!delayedDeliveryTracker.isPresent()) {
- // Initialize the tracker the first time we need to use it
- delayedDeliveryTracker = Optional
-
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
- }
-
-
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
- return delayedDeliveryTracker.get().addMessage(ledgerId, entryId,
msgMetadata.getDeliverAtTime());
- }
- }
-
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
- if (!messagesToRedeliver.isEmpty()) {
- return messagesToRedeliver.items(maxMessagesToRead,
- (ledgerId, entryId) -> new PositionImpl(ledgerId,
entryId));
+ if (!redeliveryMessages.isEmpty()) {
+ return
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
@@ -841,20 +822,37 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
}
+ protected boolean addMessageToReplay(long ledgerId, long entryId, long
stickyKeyHash) {
+ if (checkIfMessageIsUnacked(ledgerId, entryId)) {
+ redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
protected boolean addMessageToReplay(long ledgerId, long entryId) {
- PositionImpl markDeletePosition = (PositionImpl)
cursor.getMarkDeletedPosition();
- if (markDeletePosition == null || ledgerId >
markDeletePosition.getLedgerId()
- || (ledgerId == markDeletePosition.getLedgerId() && entryId >
markDeletePosition.getEntryId())) {
- messagesToRedeliver.add(ledgerId, entryId);
+ if (checkIfMessageIsUnacked(ledgerId, entryId)) {
+ redeliveryMessages.add(ledgerId, entryId);
return true;
} else {
return false;
}
}
+ private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) {
+ PositionImpl markDeletePosition = (PositionImpl)
cursor.getMarkDeletedPosition();
+ return (markDeletePosition == null || ledgerId >
markDeletePosition.getLedgerId()
+ || (ledgerId == markDeletePosition.getLedgerId() && entryId >
markDeletePosition.getEntryId()));
+ }
+
public PersistentTopic getTopic() {
return topic;
}
+ protected int getStickyKeyHash(Entry entry) {
+ return
StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 45921dd..299f545 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -72,7 +72,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
- super(topic, cursor, subscription);
+ super(topic, cursor, subscription, ksm.getAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
@@ -125,7 +125,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
public synchronized void removeConsumer(Consumer consumer) throws
BrokerServiceException {
// The consumer must be removed from the selector before calling the
superclass removeConsumer method.
// In the superclass removeConsumer method, the pending acks that the
consumer has are added to
- // messagesToRedeliver. If the consumer has not been removed from the
selector at this point,
+ // redeliveryMessages. If the consumer has not been removed from the
selector at this point,
// the broker will try to redeliver the messages to the consumer that
has already been closed.
// As a result, the messages are not redelivered to any consumer, and
the mark-delete position does not move,
// eventually causing all consumers to get stuck.
@@ -136,7 +136,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
if (consumerList.size() == 1) {
recentlyJoinedConsumers.clear();
}
- if (removeConsumersFromRecentJoinedConsumers() ||
messagesToRedeliver.size() > 0) {
+ if (removeConsumersFromRecentJoinedConsumers() ||
!redeliveryMessages.isEmpty()) {
readMoreEntries();
}
}
@@ -171,10 +171,13 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
groupedEntries.clear();
+ final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new
HashMap<>();
for (Entry entry : entries) {
- Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
+ int stickyKeyHash = getStickyKeyHash(entry);
+ Consumer c = selector.select(stickyKeyHash);
groupedEntries.computeIfAbsent(c, k -> new
ArrayList<>()).add(entry);
+ consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new
HashSet<>()).add(stickyKeyHash);
}
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
@@ -189,7 +192,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
int entriesWithSameKeyCount = entriesWithSameKey.size();
final int availablePermits = consumer == null ? 0 :
Math.max(consumer.getAvailablePermits(), 0);
int maxMessagesForC = Math.min(entriesWithSameKeyCount,
availablePermits);
- int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
entriesWithSameKey, maxMessagesForC, readType);
+ int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
entriesWithSameKey, maxMessagesForC,
+ readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read
type is {}",
name, consumer.consumerName(), messagesForC, readType);
@@ -200,7 +204,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
- addMessageToReplay(entry.getLedgerId(),
entry.getEntryId());
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(),
entry.getEntryId(), stickyKeyHash);
entry.release();
entriesWithSameKey.set(i, null);
}
@@ -211,7 +216,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
if (readType == ReadType.Replay) {
for (int i = 0; i < messagesForC; i++) {
Entry entry = entriesWithSameKey.get(i);
- messagesToRedeliver.remove(entry.getLedgerId(),
entry.getEntryId());
+ redeliveryMessages.remove(entry.getLedgerId(),
entry.getEntryId());
}
}
@@ -279,12 +284,19 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages, ReadType readType) {
+ private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages,
+ ReadType readType, Set<Integer> stickyKeyHashes) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}
+ if (readType == ReadType.Normal && stickyKeyHashes != null
+ &&
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
+ // If redeliveryMessages contains messages that correspond to the
same hash as the messages
+ // that the dispatcher is trying to send, do not send those
messages for order guarantee
+ return 0;
+ }
if (recentlyJoinedConsumers == null) {
return maxMessages;
}
@@ -362,6 +374,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return hasConsumerRemovedFromTheRecentJoinedConsumers;
}
+ @Override
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on
the topic (until the overall max-unacked
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c7344d6..43645bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -80,7 +81,6 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -1571,9 +1571,10 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
PersistentSubscription subRef = topicRef.getSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
- Field replayMap =
PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver");
- replayMap.setAccessible(true);
- ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64,
1);
+ Field redeliveryMessagesField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("redeliveryMessages");
+ redeliveryMessagesField.setAccessible(true);
+ MessageRedeliveryController redeliveryMessages = new
MessageRedeliveryController(true);
assertNotNull(subRef);
@@ -1594,24 +1595,24 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
}
if (i < replayIndex) {
// (3) accumulate acked messages for replay
- messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId());
+ redeliveryMessages.add(msgId.getLedgerId(),
msgId.getEntryId());
}
}
// (4) redelivery : should redeliver only unacked messages
Thread.sleep(1000);
- replayMap.set(dispatcher, messagesToReplay);
+ redeliveryMessagesField.set(dispatcher, redeliveryMessages);
// (a) redelivery with all acked-message should clear messageReply
bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
- return messagesToReplay.isEmpty();
+ return redeliveryMessages.isEmpty();
});
- assertEquals(messagesToReplay.size(), 0);
+ assertTrue(redeliveryMessages.isEmpty());
// (b) fill messageReplyBucket with already acked entry again: and try
to publish new msg and read it
- messagesToReplay.add(firstAckedMsg.getLedgerId(),
firstAckedMsg.getEntryId());
- replayMap.set(dispatcher, messagesToReplay);
+ redeliveryMessages.add(firstAckedMsg.getLedgerId(),
firstAckedMsg.getEntryId());
+ redeliveryMessagesField.set(dispatcher, redeliveryMessages);
// send new message
final String testMsg = "testMsg";
producer.send(testMsg.getBytes());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
new file mode 100644
index 0000000..9a785f6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MessageRedeliveryControllerTest {
+ @DataProvider(name = "allowOutOfOrderDelivery")
+ public Object[][] dataProvider() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+ public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws
Exception {
+ MessageRedeliveryController controller = new
MessageRedeliveryController(allowOutOfOrderDelivery);
+
+ Field messagesToRedeliverField =
MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
+ messagesToRedeliverField.setAccessible(true);
+ LongPairSet messagesToRedeliver = (LongPairSet)
messagesToRedeliverField.get(controller);
+
+ Field hashesToBeBlockedField =
MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
+ hashesToBeBlockedField.setAccessible(true);
+ ConcurrentLongLongPairHashMap hashesToBeBlocked =
(ConcurrentLongLongPairHashMap) hashesToBeBlockedField
+ .get(controller);
+
+ if (allowOutOfOrderDelivery) {
+ assertNull(hashesToBeBlocked);
+ } else {
+ assertNotNull(hashesToBeBlocked);
+ }
+
+ assertTrue(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 0);
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 0);
+ }
+
+ assertTrue(controller.add(1, 1));
+ assertTrue(controller.add(1, 2));
+ assertFalse(controller.add(1, 1));
+
+ assertFalse(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 2);
+ assertTrue(messagesToRedeliver.contains(1, 1));
+ assertTrue(messagesToRedeliver.contains(1, 2));
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 0);
+ assertFalse(hashesToBeBlocked.containsKey(1, 1));
+ assertFalse(hashesToBeBlocked.containsKey(1, 2));
+ }
+
+ assertTrue(controller.remove(1, 1));
+ assertTrue(controller.remove(1, 2));
+ assertFalse(controller.remove(1, 1));
+
+ assertTrue(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 0);
+ assertFalse(messagesToRedeliver.contains(1, 1));
+ assertFalse(messagesToRedeliver.contains(1, 2));
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 0);
+ }
+
+ assertTrue(controller.add(2, 1, 100));
+ assertTrue(controller.add(2, 2, 101));
+ assertTrue(controller.add(2, 3, 101));
+ assertFalse(controller.add(2, 1, 100));
+
+ assertFalse(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 3);
+ assertTrue(messagesToRedeliver.contains(2, 1));
+ assertTrue(messagesToRedeliver.contains(2, 2));
+ assertTrue(messagesToRedeliver.contains(2, 3));
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 3);
+ assertEquals(hashesToBeBlocked.get(2, 1).first, 100);
+ assertEquals(hashesToBeBlocked.get(2, 2).first, 101);
+ assertEquals(hashesToBeBlocked.get(2, 3).first, 101);
+ }
+
+ controller.clear();
+ assertTrue(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 0);
+ assertTrue(messagesToRedeliver.isEmpty());
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 0);
+ assertTrue(hashesToBeBlocked.isEmpty());
+ }
+
+ controller.add(2, 2, 201);
+ controller.add(1, 3, 100);
+ controller.add(3, 1, 300);
+ controller.add(2, 1, 200);
+ controller.add(3, 2, 301);
+ controller.add(1, 2, 101);
+ controller.add(1, 1, 100);
+
+ controller.removeAllUpTo(1, 3);
+ assertEquals(messagesToRedeliver.size(), 4);
+ assertTrue(messagesToRedeliver.contains(2, 1));
+ assertTrue(messagesToRedeliver.contains(2, 2));
+ assertTrue(messagesToRedeliver.contains(3, 1));
+ assertTrue(messagesToRedeliver.contains(3, 2));
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 4);
+ assertEquals(hashesToBeBlocked.get(2, 1).first, 200);
+ assertEquals(hashesToBeBlocked.get(2, 2).first, 201);
+ assertEquals(hashesToBeBlocked.get(3, 1).first, 300);
+ assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+ }
+
+ controller.removeAllUpTo(3, 1);
+ assertEquals(messagesToRedeliver.size(), 1);
+ assertTrue(messagesToRedeliver.contains(3, 2));
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 1);
+ assertEquals(hashesToBeBlocked.get(3, 2).first, 301);
+ }
+
+ controller.removeAllUpTo(5, 10);
+ assertTrue(controller.isEmpty());
+ assertEquals(messagesToRedeliver.size(), 0);
+ if (!allowOutOfOrderDelivery) {
+ assertEquals(hashesToBeBlocked.size(), 0);
+ }
+ }
+
+ @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+ public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery)
throws Exception {
+ MessageRedeliveryController controller = new
MessageRedeliveryController(allowOutOfOrderDelivery);
+ controller.add(1, 1, 100);
+ controller.add(1, 2, 101);
+ controller.add(1, 3, 102);
+ controller.add(2, 2, 103);
+ controller.add(2, 1, 104);
+
+ if (allowOutOfOrderDelivery) {
+
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(100)));
+
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103)));
+
assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105)));
+ } else {
+
assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(100)));
+ assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(101,
102, 103)));
+ assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(104,
105)));
+ }
+
+ assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet()));
+ assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(99)));
+ assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(105,
106)));
+ }
+
+ @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000)
+ public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery)
throws Exception {
+ MessageRedeliveryController controller = new
MessageRedeliveryController(allowOutOfOrderDelivery);
+ controller.add(2, 2);
+ controller.add(1, 3);
+ controller.add(3, 1);
+ controller.add(2, 1);
+ controller.add(3, 2);
+ controller.add(1, 2);
+ controller.add(1, 1);
+
+ if (allowOutOfOrderDelivery) {
+ // The entries are sorted by ledger ID but not by entry ID
+ PositionImpl[] actual1 =
controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]);
+ PositionImpl[] expected1 = { PositionImpl.get(1, 1),
PositionImpl.get(1, 2), PositionImpl.get(1, 3) };
+ assertEqualsNoOrder(actual1, expected1);
+ } else {
+ // The entries are completely sorted
+ Set<PositionImpl> actual2 = controller.getMessagesToReplayNow(6);
+ Set<PositionImpl> expected2 = new TreeSet<>();
+ expected2.add(PositionImpl.get(1, 1));
+ expected2.add(PositionImpl.get(1, 2));
+ expected2.add(PositionImpl.get(1, 3));
+ expected2.add(PositionImpl.get(2, 1));
+ expected2.add(PositionImpl.get(2, 2));
+ expected2.add(PositionImpl.get(3, 1));
+ assertEquals(actual2, expected2);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index a902ac2..041d740 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -22,13 +22,20 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.*;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentCaptor;
@@ -43,13 +50,32 @@ import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -279,6 +305,123 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
);
}
+ @Test(timeOut = 30000)
+ public void testMessageRedelivery() throws Exception {
+ final Queue<Position> actualEntriesToConsumer1 = new
ConcurrentLinkedQueue<>();
+ final Queue<Position> actualEntriesToConsumer2 = new
ConcurrentLinkedQueue<>();
+
+ final Queue<Position> expectedEntriesToConsumer1 = new
ConcurrentLinkedQueue<>();
+ expectedEntriesToConsumer1.add(PositionImpl.get(1, 1));
+ final Queue<Position> expectedEntriesToConsumer2 = new
ConcurrentLinkedQueue<>();
+ expectedEntriesToConsumer2.add(PositionImpl.get(1, 2));
+ expectedEntriesToConsumer2.add(PositionImpl.get(1, 3));
+
+ final AtomicInteger remainingEntriesNum = new AtomicInteger(
+ expectedEntriesToConsumer1.size() +
expectedEntriesToConsumer2.size());
+
+ // Messages with key1 are routed to consumer1 and messages with key2
are routed to consumer2
+ final List<Entry> allEntries = new ArrayList<>();
+ allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1,
"key2")));
+ allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2,
"key1")));
+ allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3,
"key1")));
+ allEntries.forEach(entry -> ((EntryImpl) entry).retain());
+
+ final List<Entry> redeliverEntries = new ArrayList<>();
+ redeliverEntries.add(allEntries.get(0)); // message1
+ final List<Entry> readEntries = new ArrayList<>();
+ readEntries.add(allEntries.get(2)); // message3
+
+ final Consumer consumer1 = mock(Consumer.class);
+ doReturn("consumer1").when(consumer1).consumerName();
+ // Change availablePermits of consumer1 to 0 and then back to normal
+ when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10);
+ doReturn(true).when(consumer1).isWritable();
+ doAnswer(invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ List<Entry> entries = (List<Entry>)
invocationOnMock.getArgument(0);
+ for (Entry entry : entries) {
+ remainingEntriesNum.decrementAndGet();
+ actualEntriesToConsumer1.add(entry.getPosition());
+ }
+ return channelMock;
+ }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
+ anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+
+ final Consumer consumer2 = mock(Consumer.class);
+ doReturn("consumer2").when(consumer2).consumerName();
+ when(consumer2.getAvailablePermits()).thenReturn(10);
+ doReturn(true).when(consumer2).isWritable();
+ doAnswer(invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ List<Entry> entries = (List<Entry>)
invocationOnMock.getArgument(0);
+ for (Entry entry : entries) {
+ remainingEntriesNum.decrementAndGet();
+ actualEntriesToConsumer2.add(entry.getPosition());
+ }
+ return channelMock;
+ }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
+ anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+
+ persistentDispatcher.addConsumer(consumer1);
+ persistentDispatcher.addConsumer(consumer2);
+
+ final Field totalAvailablePermitsField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("totalAvailablePermits");
+ totalAvailablePermitsField.setAccessible(true);
+ totalAvailablePermitsField.set(persistentDispatcher, 1000);
+
+ final Field redeliveryMessagesField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("redeliveryMessages");
+ redeliveryMessagesField.setAccessible(true);
+ MessageRedeliveryController redeliveryMessages =
(MessageRedeliveryController) redeliveryMessagesField
+ .get(persistentDispatcher);
+ redeliveryMessages.add(allEntries.get(0).getLedgerId(),
allEntries.get(0).getEntryId(),
+ getStickyKeyHash(allEntries.get(0))); // message1
+ redeliveryMessages.add(allEntries.get(1).getLedgerId(),
allEntries.get(1).getEntryId(),
+ getStickyKeyHash(allEntries.get(1))); // message2
+
+ // Mock Cursor#asyncReplayEntries
+ doAnswer(invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ Set<Position> positions = (Set<Position>)
invocationOnMock.getArgument(0);
+ List<Entry> entries = allEntries.stream().filter(entry ->
positions.contains(entry.getPosition()))
+ .collect(Collectors.toList());
+ if (!entries.isEmpty()) {
+ ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(1))
+ .readEntriesComplete(entries,
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay);
+ }
+ return Collections.emptySet();
+ }).when(cursorMock).asyncReplayEntries(anySet(),
any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay),
anyBoolean());
+
+ // Mock Cursor#asyncReadEntriesOrWait
+ doAnswer(invocationOnMock -> {
+ ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(2))
+ .readEntriesComplete(readEntries,
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ return null;
+ }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
+ any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
+
+ // (1) Run sendMessagesToConsumers
+ // (2) Attempts to send message1 to consumer1 but skipped because
availablePermits is 0
+ // (3) Change availablePermits of consumer1 to 10
+ // (4) Run readMoreEntries internally
+ // (5) Run sendMessagesToConsumers internally
+ // (6) Attempts to send message3 to consumer2 but skipped because
redeliveryMessages contains message2
+
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay,
+ redeliverEntries);
+ while (remainingEntriesNum.get() > 0) {
+ // (7) Run readMoreEntries and resend message1 to consumer1 and
message2-3 to consumer2
+ persistentDispatcher.readMoreEntries();
+ }
+
+ assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1);
+ assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2);
+
+ allEntries.forEach(entry -> entry.release());
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
@@ -292,4 +435,9 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
messageMetadata.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
+
+ private int getStickyKeyHash(Entry entry) {
+ byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(),
topicName, subscriptionName);
+ return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 1a0dac5..1cd0385 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -1529,6 +1530,23 @@ public class Commands {
return res;
}
+ private static final byte[] NONE_KEY =
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String
topic, String subscription) {
+ try {
+ int readerIdx = metadataAndPayload.readerIndex();
+ MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ metadataAndPayload.readerIndex(readerIdx);
+ if (metadata.hasOrderingKey()) {
+ return metadata.getOrderingKey().toByteArray();
+ } else if (metadata.hasPartitionKey()) {
+ return
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+ }
+ } catch (Throwable t) {
+ log.error("[{}] [{}] Failed to peek sticky key from the message
metadata", topic, subscription, t);
+ }
+ return Commands.NONE_KEY;
+ }
+
// ---- transaction related ----
public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {