equanz commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1519261144


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -122,15 +136,18 @@ public synchronized CompletableFuture<Void> 
addConsumer(Consumer consumer) {
                 })
         ).thenRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                PositionImpl readPositionWhenJoining = (PositionImpl) 
cursor.getReadPosition();
-                consumer.setReadPositionWhenJoining(readPositionWhenJoining);
-                // If this was the 1st consumer, or if all the messages are 
already acked, then we
-                // don't need to do anything special
-                if (!allowOutOfOrderDelivery
-                        && recentlyJoinedConsumers != null
-                        && consumerList.size() > 1
-                        && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-                    recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+                if (!allowOutOfOrderDelivery) {
+                    final PositionImpl lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
+                    if (lastSentPositionWhenJoining != null) {
+                        
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+                        // If this was the 1st consumer, or if all the 
messages are already acked, then we
+                        // don't need to do anything special
+                        if (recentlyJoinedConsumers != null
+                                && consumerList.size() > 1
+                                && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+                            recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);

Review Comment:
   > After consumers redelivery messages, you did not remove them out of 
`individuallySentPositions`, right? 
   
   Yes.
   
    >So the mechanism of "Calculate the lastest position sent" could not work 
as expected, right?
   
   If we also care about redelivery messages, then yes. However, I don't think 
we need to care about that fully. We can't control the order of client-side 
redelivery operations(e.g. request to redelivery x:1 after x:2).
   
   Of course, we should at least care about broker-side redelivery operations 
like the ones below.
   
https://github.com/apache/pulsar/blob/c36c18d44fd4df07bce9b7961c59685e0e91a420/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L196
   
https://github.com/apache/pulsar/blob/c36c18d44fd4df07bce9b7961c59685e0e91a420/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L279-L285
   
   > Suggestion: when a new consumer joins, we can calculate "the lastest 
position sent" in real-time relying on these variables:
   
   I'll consider about it. Thanks. My initial impression is,
   * Is it equivalent to the current approach?
   * Does it have a significant impact on performance?
   * Can useful stats be implemented?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to