equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1593246433


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -312,6 +365,62 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             }
         }
 
+        // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
+        if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+            final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+            com.google.common.collect.Range<PositionImpl> range = 
individuallySentPositions.firstRange();
+
+            // If the upper bound is before the last sent position, we need to 
move ahead as these
+            // individuallySentPositions are now irrelevant.
+            if (range != null && 
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
+                
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
+                        lastSentPosition.getEntryId());
+                range = individuallySentPositions.firstRange();
+            }
+
+            if (range != null) {
+                // If the lowerBound is ahead of the last sent position,
+                // verify if there are any entries in-between.
+                if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 || 
managedLedger

Review Comment:
   > * Send `E6` to `c1` and `E9~E11` to `c2`.
   >     * State `replay-queue`: `E7, E8`, `sentPostionList`: `[E6, E9~E11]`, 
`lastSentPistion`: `E5(mark-deleted position)`
   
   In this step, more precisely, the `sentPostionList` is `[(E5, E6], (E8, 
E11]]`. So, the first range of it is `(E5, E6]`.
   The lower endpoint ( `E5` ) is less than or equal to the current 
lastSentPosition ( `E5` ), so the `lastSentPosition` will be updated to the 
upper endpoint ( `E6` ).
   
   
https://github.com/equanz/pulsar/blob/c3b318af10b45d53c07cc2163a256ce98edf8e8b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L371-L421
   
   > * Add new consumer `c3`, the selector will return `c3` if `key-a`
   >     * State `lastSentPosition` of `c3` is `E5`
   
   From the preceding, the `lastSentPosition` of `c3` (joined position) is `E6`.
   
   > * Send nothing to `c3`.
   
   Send `E6` to `c3`.
   
   
https://github.com/equanz/pulsar/blob/c3b318af10b45d53c07cc2163a256ce98edf8e8b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L495-L501



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to