equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1593651866


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             }
         }
 
+        // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
+        if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+            final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            com.google.common.collect.Range<PositionImpl> range = 
individuallySentPositions.firstRange();
+
+            // If the upper bound is before the last sent position, we need to 
move ahead as these
+            // individuallySentPositions are now irrelevant.
+            if (range != null && 
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+                
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+                        lastSentPosition.getEntryId());
+                range = individuallySentPositions.firstRange();
+            }
+
+            if (range != null) {
+                // If the lowerBound is ahead of the last sent position,
+                // verify if there are any entries in-between.
+                if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 || 
managedLedger

Review Comment:
   > * Send `E6` to `c3`.
   >     * State `replay-queue`: `E7, E8`, `sentPostionList`: `[E5~E6]`, 
`lastSentPistion`: `E6(re-calculate after sent)`
   
   The `sentPostionList` and `lastSentPistion` are not cleared by disconnecting 
a consumer. These are cleared when all consumers are disconnected.
   
https://github.com/equanz/pulsar/blob/c3b318af10b45d53c07cc2163a256ce98edf8e8b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L173-L179
   
   In this proposal (https://github.com/apache/pulsar/pull/20776) , we care 
only **not dispatched messages**. We don't care about already dispatched 
messages.
   
   * [issue-1]: Care about **read but not try to dispatched** messages
   * [issue-2]: Care about **skipped** messages
   
   I don't think we need to care about already dispatched messages. Because we 
can't control client-side redeliver operations from broker-side fully.
   
   simple example:
   (Assume that the selector will return `c2`)
   ```
   c1: 3:1, 3:2
   c2:
   lastSentPosition: 3:2
   recentlyJoinedConsumers: {c3: 3:2}
   -> (unack 3:2)
   c1: 3:1
   c2: 3:2
   lastSentPosition: 3:2
   recentlyJoinedConsumers: {c3: 3:2}
   -> (unack 3:1)
   c1:
   c2: 3:2, 3:1  // out-of-order
   lastSentPosition: 3:2
   recentlyJoinedConsumers: {c3: 3:2}
   ```
   
   https://github.com/apache/pulsar/pull/21953#discussion_r1519261144



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to