This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b505a0f3ce5 Dispatcher did unnecessary sort for 
recentlyJoinedConsumers and printed noisy error logs (#24634)
b505a0f3ce5 is described below

commit b505a0f3ce52d0117243390f1284ae42651019e5
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Sep 9 19:17:02 2025 +0800

    Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed 
noisy error logs (#24634)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 42 ++++++++++----
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 64 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 14af67b4573..da59e8c6df2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -85,11 +85,26 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     public boolean sortRecentlyJoinedConsumersIfNeeded = true;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
-            Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
+                                                          Subscription 
subscription, ServiceConfiguration conf,
+                                                          KeySharedMeta ksm) {
+        this(topic, cursor, subscription, conf, ksm, null);
+    }
+
+    /**
+     * @param recentlyJoinedConsumers This parameter is only used for testing.
+     */
+    @VisibleForTesting
+    PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
+            Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm,
+                                                   LinkedHashMap<Consumer, 
PositionImpl> recentlyJoinedConsumers) {
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
-        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
+        if (recentlyJoinedConsumers == null) {
+            this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : 
new LinkedHashMap<>();
+        } else {
+            this.recentlyJoinedConsumers = recentlyJoinedConsumers;
+        }
         this.keySharedMode = ksm.getKeySharedMode();
         switch (this.keySharedMode) {
         case AUTO_SPLIT:
@@ -154,6 +169,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         });
     }
 
+    /**
+     * Sort items in the collection "recentlyJoinedConsumers" if needed.
+     * Since we check the order of queue after each consumer joined, we can 
only check the last two items.
+     */
     private void sortRecentlyJoinedConsumersIfNeeded() {
         if (!sortRecentlyJoinedConsumersIfNeeded) {
             return;
@@ -161,20 +180,21 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         if (recentlyJoinedConsumers.size() == 1) {
             return;
         }
-        // Since we check the order of queue after each consumer joined, we 
can only check the last two items.
         boolean sortNeeded = false;
-        PositionImpl posPre = null;
-        PositionImpl posAfter = null;
+        PositionImpl secondLatest = null;
+        PositionImpl latest = null;
         for (Map.Entry<Consumer, PositionImpl> entry : 
recentlyJoinedConsumers.entrySet()) {
-            if (posPre == null) {
-                posPre = entry.getValue();
+            if (secondLatest == null) {
+                secondLatest = entry.getValue();
+            } else if (latest == null) {
+                latest = entry.getValue();
             } else {
-                posPre = posAfter;
-                posAfter = entry.getValue();
+                secondLatest = latest;
+                latest = entry.getValue();
             }
         }
-        if (posPre != null && posAfter != null) {
-            if (posPre.compareTo(posAfter) > 0) {
+        if (secondLatest != null && latest != null) {
+            if (secondLatest.compareTo(latest) > 0) {
                 sortNeeded = true;
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 9b7c98cc30e..20ffcff90ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -46,6 +47,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -299,6 +301,68 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         persistentDispatcher.close();
     }
 
+    @Test
+    public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws 
Exception {
+        // Inject a sorting counter.
+        LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = new 
LinkedHashMap<>();
+        LinkedHashMap<Consumer, PositionImpl> spyRecentlyJoinedConsumers = 
spy(recentlyJoinedConsumers);
+        AtomicInteger sortTimes = new AtomicInteger(0);
+        doAnswer(invocationOnMock -> {
+            sortTimes.incrementAndGet();
+            return invocationOnMock.callRealMethod();
+        }).when(spyRecentlyJoinedConsumers).clear();
+
+        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher =
+                new PersistentStickyKeyDispatcherMultipleConsumers(
+                topicMock, cursorMock, subscriptionMock, configMock,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), 
spyRecentlyJoinedConsumers);
+
+        Consumer consumer0 = createMockConsumer();
+        when(consumer0.consumerName()).thenReturn("0");
+        Consumer consumer1 = createMockConsumer();
+        when(consumer0.consumerName()).thenReturn("MzGG2");
+        Consumer consumer2 = createMockConsumer();
+        when(consumer1.consumerName()).thenReturn("rMOYG");
+        Consumer consumer3 = createMockConsumer();
+        when(consumer2.consumerName()).thenReturn("QIleA");
+
+        
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
+        
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1));
+        persistentDispatcher.addConsumer(consumer0).join();
+
+        when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1));
+        persistentDispatcher.addConsumer(consumer1).join();
+
+        when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1));
+        persistentDispatcher.addConsumer(consumer2).join();
+
+        when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 2));
+        persistentDispatcher.addConsumer(consumer3).join();
+
+        assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 
3);
+
+        Iterator<Map.Entry<Consumer, PositionImpl>> itr =
+                
persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
+
+        Map.Entry<Consumer, PositionImpl> entry1 = itr.next();
+        assertEquals(entry1.getValue(), PositionImpl.get(1, 1));
+        assertEquals(entry1.getKey(), consumer1);
+
+        Map.Entry<Consumer, PositionImpl> entry2 = itr.next();
+        assertEquals(entry2.getValue(), PositionImpl.get(1, 1));
+        assertEquals(entry2.getKey(), consumer2);
+
+        Map.Entry<Consumer, PositionImpl> entry3 = itr.next();
+        assertEquals(entry3.getValue(), PositionImpl.get(1, 2));
+        assertEquals(entry3.getKey(), consumer3);
+
+        // Verify: no sorting was executed
+        assertEquals(sortTimes.get(), 0);
+
+        // cleanup.
+        persistentDispatcher.close();
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {

Reply via email to