poorbarcode commented on code in PR #23352:
URL: https://github.com/apache/pulsar/pull/23352#discussion_r1792067295


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -316,60 +294,6 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
-        if (recentlyJoinedConsumerTrackingRequired && lastSentPosition != 
null) {
-            final ManagedLedger managedLedger = cursor.getManagedLedger();
-            com.google.common.collect.Range<Position> range = 
individuallySentPositions.firstRange();
-
-            // If the upper bound is before the last sent position, we need to 
move ahead as these
-            // individuallySentPositions are now irrelevant.
-            if (range != null && 
range.upperEndpoint().compareTo(lastSentPosition) <= 0) {
-                
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
-                        lastSentPosition.getEntryId());
-                range = individuallySentPositions.firstRange();
-            }
-
-            if (range != null) {
-                // If the lowerBound is ahead of the last sent position,
-                // verify if there are any entries in-between.
-                if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 || 
managedLedger
-                        
.getNumberOfEntries(com.google.common.collect.Range.openClosed(lastSentPosition,
-                                range.lowerEndpoint())) <= 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Found a position range to last sent: 
{}", name, range);
-                    }
-                    Position newLastSentPosition = range.upperEndpoint();
-                    Position positionAfterNewLastSent = managedLedger
-                            .getNextValidPosition(newLastSentPosition);
-                    // sometime ranges are connected but belongs to different 
ledgers
-                    // so, they are placed sequentially
-                    // eg: (2:10..3:15] can be returned as 
(2:10..2:15],[3:0..3:15].
-                    // So, try to iterate over connected range and found the 
last non-connected range
-                    // which gives new last sent position.
-                    final Position lastConfirmedEntrySnapshot = 
managedLedger.getLastConfirmedEntry();
-                    if (lastConfirmedEntrySnapshot != null) {
-                        while 
(positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0) {
-                            if 
(individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(),
-                                    positionAfterNewLastSent.getEntryId())) {
-                                range = 
individuallySentPositions.rangeContaining(
-                                        
positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId());
-                                newLastSentPosition = range.upperEndpoint();
-                                positionAfterNewLastSent = 
managedLedger.getNextValidPosition(newLastSentPosition);
-                                // check if next valid position is also 
deleted and part of the deleted-range
-                                continue;
-                            }
-                            break;
-                        }
-                    }
-
-                    if (lastSentPosition.compareTo(newLastSentPosition) < 0) {
-                        lastSentPosition = newLastSentPosition;
-                    }
-                    
individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(),
-                            lastSentPosition.getEntryId());
-                }
-            }
-        }
 
         lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;

Review Comment:
   > ```java
   > // line-281
   > totalEntriesProcessed += entriesForConsumer.size();
   > // line-282
   > consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks,
   >        sendMessageInfo.getTotalMessages(),
   >        sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
   > ...
   > // line-298
   > this.lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
   > ```
   
   Since the method `consumer.sendMessages` will filter out entries due to the 
mechanism `consumer.pendingAcks.addPendingAckIfAllowed`, the variable 
`lastNumberOfEntriesProcessed` will be larger than expected



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -131,60 +131,74 @@ public synchronized CompletableFuture<Void> 
addConsumer(Consumer consumer) {
             consumer.disconnect();
             return CompletableFuture.completedFuture(null);
         }
-        return super.addConsumer(consumer).thenCompose(__ ->
-                selector.addConsumer(consumer).handle((result, ex) -> {
-                    if (ex != null) {
-                        synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
-                            consumerSet.removeAll(consumer);
-                            consumerList.remove(consumer);
-                        }
-                        throw FutureUtil.wrapToCompletionException(ex);
+        return super.addConsumer(consumer).thenCompose(__ -> 
selector.addConsumer(consumer))
+                .thenAccept(impactedConsumers -> {
+            // TODO: Add some way to prevent changes in between the time the 
consumer is added and the
+            // time the draining hashes are applied. It might be fine for 
ConsistentHashingStickyKeyConsumerSelector
+            // since it's not really asynchronous, although it returns a 
CompletableFuture
+            if (drainingHashesRequired) {
+                
consumer.setPendingAcksAddHandler(this::handleAddingPendingAck);
+                consumer.setPendingAcksRemoveHandler(new 
PendingAcksMap.PendingAcksRemoveHandler() {
+                    @Override
+                    public void handleRemoving(Consumer consumer, long 
ledgerId, long entryId, int stickyKeyHash,
+                                               boolean closing) {
+                        drainingHashesTracker.reduceRefCount(consumer, 
stickyKeyHash, closing);
                     }
-                    return result;
-                })
-        ).thenRun(() -> {
-            synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                if (recentlyJoinedConsumerTrackingRequired) {
-                    final Position lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
-                    if (lastSentPositionWhenJoining != null) {
-                        
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
-                        // If this was the 1st consumer, or if all the 
messages are already acked, then we
-                        // don't need to do anything special
-                        if (recentlyJoinedConsumers != null
-                                && consumerList.size() > 1
-                                && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-                            recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);
-                        }
+
+                    @Override
+                    public void startBatch() {
+                        drainingHashesTracker.startBatch();
                     }
-                }
+
+                    @Override
+                    public void endBatch() {
+                        drainingHashesTracker.endBatch();
+                    }
+                });
+                registerDrainingHashes(consumer, impactedConsumers);
+            }
+        }).exceptionally(ex -> {
+            internalRemoveConsumer(consumer);
+            throw FutureUtil.wrapToCompletionException(ex);
+        });
+    }
+
+    private synchronized void registerDrainingHashes(Consumer skipConsumer,
+                                                     ImpactedConsumersResult 
impactedConsumers) {
+        impactedConsumers.processRemovedHashRanges((c, removedHashRanges) -> {
+            if (c != skipConsumer) {
+                c.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
+                    if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) {
+                        log.warn("[{}] Sticky key hash was missing for {}:{}", 
getName(), ledgerId, entryId);
+                        return;
+                    }
+                    if (removedHashRanges.containsStickyKey(stickyKeyHash)) {
+                        // add the pending ack to the draining hashes tracker 
if the hash is in the range
+                        drainingHashesTracker.addEntry(c, stickyKeyHash);

Review Comment:
   **Background**
   - `drainingHashesTracker.addEntry(c, stickyKeyHash)` will increase the 
variable `DrainingHashEntry.refCount`.  
   
   **Issue**: the variable named `DrainingHashEntry.refCount` will be increased 
repeatedly at the following scenario:
   - Register `C1`, assign `k1` to `C1`
   - Register `C2`, move `k1` to `C2`
     - `DrainingHashEntry.refCount` increases 
   - Register `C3`, move `k1` to `C3`
     - `DrainingHashEntry.refCount` increases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to