poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1593297796
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<PositionImpl> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
Review Comment:
OK. Correct the flow.
`msgs` on the topic.
- `L: E6` key: `key-a`
- `L: E7` key: `key-a`
- `L: E8` key: `key-a`
- `L: E9` key: `key-b`
- `L: E10` key: `key-b`
- `L: E11` key: `key-b`
---
The flow of sending messages
- `incoming-queue` of `c1` is `1`, `c2` is `1k`
- Send `E6` to `c1` and `E9~E11` to `c2`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E5~E6, E8~E11]`,
`lastSentPistion`: `E6(re-calculate after sent)`
- `c1` never acknowledges `E6`.
- Add new consumer `c3`, the selector will return `c3` if `key-a`
- State `lastSentPosition` of `c3` is `E6`
- Disconnect c1.
- Next reading: `[E6~E7]`
- Send `E6` to `c3`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E8~E11]`,
`lastSentPistion`: `E6`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<PositionImpl> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
Review Comment:
OK. Correct the flow.
---
`msgs` on the topic.
- `L: E6` key: `key-a`
- `L: E7` key: `key-a`
- `L: E8` key: `key-a`
- `L: E9` key: `key-b`
- `L: E10` key: `key-b`
- `L: E11` key: `key-b`
---
The flow of sending messages
- `incoming-queue` of `c1` is `1`, `c2` is `1k`
- Send `E6` to `c1` and `E9~E11` to `c2`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E5~E6, E8~E11]`,
`lastSentPistion`: `E6(re-calculate after sent)`
- `c1` never acknowledges `E6`.
- Add new consumer `c3`, the selector will return `c3` if `key-a`
- State `lastSentPosition` of `c3` is `E6`
- Disconnect c1.
- Next reading: `[E6~E7]`
- Send `E6` to `c3`.
- State `replay-queue`: `E7, E8`, `sentPostionList`: `[E8~E11]`,
`lastSentPistion`: `E6`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]