equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1593651866
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
+ // Update the last sent position and remove ranges from
individuallySentPositions if necessary
+ if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+ final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ com.google.common.collect.Range<PositionImpl> range =
individuallySentPositions.firstRange();
+
+ // If the upper bound is before the last sent position, we need to
move ahead as these
+ // individuallySentPositions are now irrelevant.
+ if (range != null &&
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+ lastSentPosition.getEntryId());
+ range = individuallySentPositions.firstRange();
+ }
+
+ if (range != null) {
+ // If the lowerBound is ahead of the last sent position,
+ // verify if there are any entries in-between.
+ if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 ||
managedLedger
Review Comment:
> * Send `E6` to `c3`.
> * State `replay-queue`: `E7, E8`, `sentPostionList`: `[E5~E6]`,
`lastSentPistion`: `E6(re-calculate after sent)`
The `sentPostionList` and `lastSentPistion` are not cleared by disconnecting
a consumer. These are cleared when all consumers are disconnected.
https://github.com/equanz/pulsar/blob/c3b318af10b45d53c07cc2163a256ce98edf8e8b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L173-L179
In this proposal (https://github.com/apache/pulsar/pull/20776) , we care
only **not dispatched messages**. We don't care about already dispatched
messages.
* [issue-1]: Care about **read but not try to dispatched** messages
* [issue-2]: Care about **skipped** messages
I don't think we need to care about already dispatched messages. Because we
can't control client-side redeliver operations from broker-side fully.
simple example:
(Assume that the selector will return `c2`)
```
c1: 3:1, 3:2
c2:
lastSentPosition: 3:2
recentlyJoinedConsumers: {c3: 3:2}
-> (unack 3:2)
c1: 3:1
c2: 3:2
lastSentPosition: 3:2
recentlyJoinedConsumers: {c3: 3:2}
-> (unack 3:1)
c1:
c2: 3:2, 3:1 // out-of-order
lastSentPosition: 3:2
recentlyJoinedConsumers: {c3: 3:2}
```
https://github.com/apache/pulsar/pull/21953#discussion_r1519261144
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]