poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1515671343


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -219,6 +242,24 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
                     }
                 }
             }
+
+            // Update if the markDeletePosition move forward
+            updateIfNeededAndGetLastSentPosition();
+
+            // Should not access to individualDeletedMessages from outside 
managed cursor
+            // because it doesn't guarantee thread safety.
+            if (lastSentPosition == null) {

Review Comment:
   Just mark this for easier reading: initialize `lastSentPosition & 
individuallySentPositions ` after the dispatcher created or the first consumer 
joined.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -308,6 +361,62 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             }
         }
 
+        // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
+        if (!allowOutOfOrderDelivery && lastSentPosition != null) {

Review Comment:
   Just mark this for easier reading: update `lastSentPosition` to the position 
of the first consecutive sent message



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void> 
addConsumer(Consumer consumer) {
                 })
         ).thenRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                PositionImpl readPositionWhenJoining = (PositionImpl) 
cursor.getReadPosition();
-                consumer.setReadPositionWhenJoining(readPositionWhenJoining);
-                // If this was the 1st consumer, or if all the messages are 
already acked, then we
-                // don't need to do anything special
-                if (!allowOutOfOrderDelivery
-                        && recentlyJoinedConsumers != null
-                        && consumerList.size() > 1
-                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-                    recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+                if (!allowOutOfOrderDelivery) {
+                    final PositionImpl lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
+                    if (lastSentPositionWhenJoining != null) {
+                        
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+                        // If this was the 1st consumer, or if all the 
messages are already acked, then we
+                        // don't need to do anything special
+                        if (recentlyJoinedConsumers != null
+                                && consumerList.size() > 1
+                                && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+                            recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);

Review Comment:
   After consumers redelivery messages, you did not remove them out of 
`individuallySentPositions`, right? So the mechanism of "Calculate the lastest 
position sent" could not work as expected, right?
   
   Suggestion: when a new consumer joins, we can calculate "the lastest 
position sent" in real-time relying on these variables:
   - `cursor.individualDeletedMessages`
   - `dispatcher.redeliveryMessages`
   - `consumer.pendingAcks`
   
   Then the new variable `individuallySentPositions ` can be removed, and we do 
not need to update it in-time, which makes the logic simpler



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -276,12 +317,24 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             }
 
             if (messagesForC > 0) {
-                // remove positions first from replay list first : 
sendMessages recycles entries
-                if (readType == ReadType.Replay) {
-                    for (int i = 0; i < messagesForC; i++) {
-                        Entry entry = entriesWithSameKey.get(i);
+                final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+                for (int i = 0; i < messagesForC; i++) {
+                    final Entry entry = entriesWithSameKey.get(i);
+                    // remove positions first from replay list first : 
sendMessages recycles entries
+                    if (readType == ReadType.Replay) {
                         redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                     }
+                    // Add positions to individuallySentPositions if necessary
+                    if (!allowOutOfOrderDelivery) {

Review Comment:
   Just mark this for easier reading: record the positions sent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to