This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 73ef162 Fix Issue #12885, Unordered consuming case in Key_Shared
subscription (#12890)
73ef162 is described below
commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Nov 30 20:34:45 2021 +0800
Fix Issue #12885, Unordered consuming case in Key_Shared subscription
(#12890)
---
.../PersistentDispatcherMultipleConsumers.java | 3 +++
...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9bf2db9..a240c0b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -83,6 +83,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile boolean havePendingRead = false;
protected volatile boolean havePendingReplayRead = false;
+ protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
@@ -244,6 +245,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
havePendingReplayRead = true;
+ minReplayedPosition =
messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
Set<? extends Position> deletedMessages =
topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
@@ -267,6 +269,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
+ minReplayedPosition =
getMessagesToReplayNow(1).stream().findFirst().orElse(null);
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6f4c4eb..5c8f33e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,6 +170,37 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return;
}
+ // A corner case that we have to retry a readMoreEntries in order to
preserver order delivery.
+ // This may happen when consumer closed. See issue #12885 for details.
+ if (!allowOutOfOrderDelivery) {
+ Set<PositionImpl> messagesToReplayNow =
this.getMessagesToReplayNow(1);
+ if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()
&& this.minReplayedPosition != null) {
+ PositionImpl relayPosition =
messagesToReplayNow.stream().findFirst().get();
+ // If relayPosition is a new entry wither smaller position is
inserted for redelivery during this async
+ // read, it is possible that this relayPosition should
dispatch to consumer first. So in order to
+ // preserver order delivery, we need to discard this read
result, and try to trigger a replay read,
+ // that containing "relayPosition", by calling readMoreEntries.
+ if (relayPosition.compareTo(minReplayedPosition) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Position {} (<{}) is inserted for
relay during current {} read, discard this "
+ + "read and retry with readMoreEntries.",
+ name, relayPosition, minReplayedPosition,
readType);
+ }
+ if (readType == ReadType.Normal) {
+ entries.forEach(entry -> {
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(),
entry.getEntryId(), stickyKeyHash);
+ entry.release();
+ });
+ } else if (readType == ReadType.Replay) {
+ entries.forEach(Entry::release);
+ }
+ readMoreEntries();
+ return;
+ }
+ }
+ }
+
nextStuckConsumers.clear();
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();