poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1593388517
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<PositionImpl> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
Review Comment:
---
`msgs` on the topic.
- `L: E6` key: `key-a`
- `L: E7` key: `key-a`
- `L: E8` key: `key-a`
- `L: E9` key: `key-b`
- `L: E10` key: `key-b`
- `L: E11` key: `key-b`
---
The flow of sending messages
- `incoming-queue` of both `c1` and `c2` is `1k`
- Send `E6~E8` to `c1` and `E9~E11` to `c2`.
- State `replay-queue`: `[]`, `sentPostionList`: `[]`, `lastSentPistion`:
`E11`
- `c1` never acknowledges.
- Add new consumer `c3`, the selector will return `c3` if `key-a`,
`incoming-queue` of `c3` is `1`
- State `lastSentPosition` of `c3` is `E11`
- Disconnect `c1`.
- Next reading: `[E6~E8]`
- Send `E6` to `c3`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E5~E6]`,
`lastSentPistion`: `E6(re-calculate after sent)`
- Add new consumer `c4`, the selector will return `c4` if `key-a`,
`incoming-queue` of `c4` is `1k`
- State `lastSentPosition` of `c4` is `E6`
- Disconnect `c3`.
- Next reading: `[E6~E8]`
- Send `E6` to `c4`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]