poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1592766958
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<PositionImpl> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
Review Comment:
@equanz Could you explain this case?
`msgs` on the topic.
- `L: E6` key: `key-a`
- `L: E7` key: `key-a`
- `L: E8` key: `key-a`
- `L: E9` key: `key-b`
- `L: E10` key: `key-b`
- `L: E11` key: `key-b`
---
The flow of sending messages
- `incoming-queue` of `c1` is `1`, `c2` is `1k`
- Send `E6` to `c1` and `E9~E11` to `c2`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E6, E9~E11]`,
`lastSentPistion`: `E5(mark-deleted position)`
- `c1` never acknowledge `E6`.
- Add new consumer `c3`, the selector will return `c3` if `key-a`
- State `lastSentPosition` of `c3` is `E5`
- Disconnect c1.
- Next reading: `[E6~E7]`
- Send nothing to `c3`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]