This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c2940b3c13c [fix][broker]Dispatcher did unnecessary sort for 
recentlyJoinedConsumers and printed noisy error logs (#24634)
c2940b3c13c is described below

commit c2940b3c13cef4d935655d9a7b117a43471bbed1
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Sep 9 19:17:02 2025 +0800

    [fix][broker]Dispatcher did unnecessary sort for recentlyJoinedConsumers 
and printed noisy error logs (#24634)
---
 ...tickyKeyDispatcherMultipleConsumersClassic.java | 41 ++++++++++----
 ...yKeyDispatcherMultipleConsumersClassicTest.java | 64 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index 56161d8dd15..c3b246fe9ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -58,6 +58,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,10 +96,25 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
     PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic 
topic, ManagedCursor cursor,
                                                           Subscription 
subscription, ServiceConfiguration conf,
                                                           KeySharedMeta ksm) {
+        this(topic, cursor, subscription, conf, ksm, null);
+    }
+
+    /**
+     * @param recentlyJoinedConsumers This parameter is only used for testing.
+     */
+    @VisibleForTesting
+    PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic 
topic, ManagedCursor cursor,
+                                                  Subscription subscription, 
ServiceConfiguration conf,
+                                                  KeySharedMeta ksm,
+                                                  @Nullable 
LinkedHashMap<Consumer, Position> recentlyJoinedConsumers) {
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
-        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
+        if (recentlyJoinedConsumers == null) {
+            this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : 
new LinkedHashMap<>();
+        } else {
+            this.recentlyJoinedConsumers = recentlyJoinedConsumers;
+        }
         this.keySharedMode = ksm.getKeySharedMode();
         switch (this.keySharedMode) {
         case AUTO_SPLIT:
@@ -166,6 +182,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
         });
     }
 
+    /**
+     * Sort items in the collection "recentlyJoinedConsumers" if needed.
+     * Since we check the order of queue after each consumer joined, we can 
only check the last two items.
+     */
     private void sortRecentlyJoinedConsumersIfNeeded() {
         if (!sortRecentlyJoinedConsumersIfNeeded) {
             return;
@@ -173,20 +193,21 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
         if (recentlyJoinedConsumers.size() == 1) {
             return;
         }
-        // Since we check the order of queue after each consumer joined, we 
can only check the last two items.
         boolean sortNeeded = false;
-        Position posPre = null;
-        Position posAfter = null;
+        Position secondLatest = null;
+        Position latest = null;
         for (Map.Entry<Consumer, Position> entry : 
recentlyJoinedConsumers.entrySet()) {
-            if (posPre == null) {
-                posPre = entry.getValue();
+            if (secondLatest == null) {
+                secondLatest = entry.getValue();
+            } else if (latest == null) {
+                latest = entry.getValue();
             } else {
-                posPre = posAfter;
-                posAfter = entry.getValue();
+                secondLatest = latest;
+                latest = entry.getValue();
             }
         }
-        if (posPre != null && posAfter != null) {
-            if (posPre.compareTo(posAfter) > 0) {
+        if (secondLatest != null && latest != null) {
+            if (secondLatest.compareTo(latest) > 0) {
                 sortNeeded = true;
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
index fd87ead2017..1ef74915418 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -46,6 +47,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -299,6 +301,68 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassicTest {
         persistentDispatcher.close();
     }
 
+    @Test
+    public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws 
Exception {
+        // Inject a sorting counter.
+        LinkedHashMap<Consumer, Position> recentlyJoinedConsumers = new 
LinkedHashMap<>();
+        LinkedHashMap<Consumer, Position> spyRecentlyJoinedConsumers = 
spy(recentlyJoinedConsumers);
+        AtomicInteger sortTimes = new AtomicInteger(0);
+        doAnswer(invocationOnMock -> {
+            sortTimes.incrementAndGet();
+            return invocationOnMock.callRealMethod();
+        }).when(spyRecentlyJoinedConsumers).clear();
+
+        PersistentStickyKeyDispatcherMultipleConsumersClassic 
persistentDispatcher =
+                new PersistentStickyKeyDispatcherMultipleConsumersClassic(
+                topicMock, cursorMock, subscriptionMock, configMock,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), 
spyRecentlyJoinedConsumers);
+
+        Consumer consumer0 = createMockConsumer();
+        when(consumer0.consumerName()).thenReturn("0");
+        Consumer consumer1 = createMockConsumer();
+        when(consumer0.consumerName()).thenReturn("MzGG2");
+        Consumer consumer2 = createMockConsumer();
+        when(consumer1.consumerName()).thenReturn("rMOYG");
+        Consumer consumer3 = createMockConsumer();
+        when(consumer2.consumerName()).thenReturn("QIleA");
+
+        
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
+        
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1, 
-1));
+        persistentDispatcher.addConsumer(consumer0).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1));
+        persistentDispatcher.addConsumer(consumer1).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1));
+        persistentDispatcher.addConsumer(consumer2).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 2));
+        persistentDispatcher.addConsumer(consumer3).join();
+
+        assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 
3);
+
+        Iterator<Map.Entry<Consumer, Position>> itr =
+                
persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
+
+        Map.Entry<Consumer, Position> entry1 = itr.next();
+        assertEquals(entry1.getValue(), PositionFactory.create(1, 1));
+        assertEquals(entry1.getKey(), consumer1);
+
+        Map.Entry<Consumer, Position> entry2 = itr.next();
+        assertEquals(entry2.getValue(), PositionFactory.create(1, 1));
+        assertEquals(entry2.getKey(), consumer2);
+
+        Map.Entry<Consumer, Position> entry3 = itr.next();
+        assertEquals(entry3.getValue(), PositionFactory.create(1, 2));
+        assertEquals(entry3.getKey(), consumer3);
+
+        // Verify: no sorting was executed
+        assertEquals(sortTimes.get(), 0);
+
+        // cleanup.
+        persistentDispatcher.close();
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {

Reply via email to