This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4261d23824 [improve][broker] Don't use "recently joined consumers" 
rules for Key_Shared in STICKY mode (#23275)
d4261d23824 is described below

commit d4261d23824412da12e27a56c0fa30250bbe6378
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Sep 19 11:14:11 2024 +0300

    [improve][broker] Don't use "recently joined consumers" rules for 
Key_Shared in STICKY mode (#23275)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d45b9394dd7..26463ba902c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -66,6 +66,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     private final boolean allowOutOfOrderDelivery;
     private final StickyKeyConsumerSelector selector;
+    private final boolean recentlyJoinedConsumerTrackingRequired;
 
     private boolean skipNextReplayToTriggerLookAhead = false;
     private final KeySharedMode keySharedMode;
@@ -90,10 +91,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
-        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
-        this.individuallySentPositions =
-                allowOutOfOrderDelivery ? null : new 
ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
         this.keySharedMode = ksm.getKeySharedMode();
+        // recent joined consumer tracking is required only for AUTO_SPLIT 
mode when out-of-order delivery is disabled
+        this.recentlyJoinedConsumerTrackingRequired =
+                keySharedMode == KeySharedMode.AUTO_SPLIT && 
!allowOutOfOrderDelivery;
+        this.recentlyJoinedConsumers = recentlyJoinedConsumerTrackingRequired 
? new LinkedHashMap<>() : null;
+        this.individuallySentPositions =
+                recentlyJoinedConsumerTrackingRequired
+                        ? new ConcurrentOpenLongPairRangeSet<>(4096, 
positionRangeConverter)
+                        : null;
         switch (this.keySharedMode) {
         case AUTO_SPLIT:
             if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
@@ -138,7 +144,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 })
         ).thenRun(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-                if (!allowOutOfOrderDelivery) {
+                if (recentlyJoinedConsumerTrackingRequired) {
                     final Position lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
                     if (lastSentPositionWhenJoining != null) {
                         
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
@@ -165,7 +171,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         // eventually causing all consumers to get stuck.
         selector.removeConsumer(consumer);
         super.removeConsumer(consumer);
-        if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null) {
+        if (recentlyJoinedConsumerTrackingRequired) {
             recentlyJoinedConsumers.remove(consumer);
             if (consumerList.size() == 1) {
                 recentlyJoinedConsumers.clear();
@@ -231,7 +237,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                     }
                 }
             }
+        }
 
+        if (recentlyJoinedConsumerTrackingRequired) {
             // Update if the markDeletePosition move forward
             updateIfNeededAndGetLastSentPosition();
 
@@ -273,7 +281,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                     redeliveryMessages.remove(entry.getLedgerId(), 
entry.getEntryId());
                 }
                 // Add positions to individuallySentPositions if necessary
-                if (!allowOutOfOrderDelivery) {
+                if (recentlyJoinedConsumerTrackingRequired) {
                     final Position position = entry.getPosition();
                     // Store to individuallySentPositions even if 
lastSentPosition is null
                     if ((lastSentPosition == null || 
position.compareTo(lastSentPosition) > 0)
@@ -306,7 +314,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         }
 
         // Update the last sent position and remove ranges from 
individuallySentPositions if necessary
-        if (!allowOutOfOrderDelivery && lastSentPosition != null) {
+        if (recentlyJoinedConsumerTrackingRequired && lastSentPosition != 
null) {
             final ManagedLedger managedLedger = cursor.getManagedLedger();
             com.google.common.collect.Range<Position> range = 
individuallySentPositions.firstRange();
 

Reply via email to