This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c304bf5be93403bf628cf1eb8b3e80bfbf15ca92
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jan 2 15:54:34 2025 +0800

    [fix][broker] Msg delivery is stuck due to items in the collection 
recentlyJoinedConsumers are out-of-order (#23795)
    
    (cherry picked from commit 4a01423273c010eebce549139b2fc400bb59555e)
---
 ...tickyKeyDispatcherMultipleConsumersClassic.java | 40 ++++++++++-
 ...yKeyDispatcherMultipleConsumersClassicTest.java | 81 ++++++++++++++++++++++
 2 files changed, 119 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index c227bf5b435..71f37c5939d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -146,11 +146,44 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
                         && consumerList.size() > 1
                         && 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
                     recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+                    sortRecentlyJoinedConsumersIfNeeded();
                 }
             }
         });
     }
 
+    private void sortRecentlyJoinedConsumersIfNeeded() {
+        if (recentlyJoinedConsumers.size() == 1) {
+            return;
+        }
+        boolean sortNeeded = false;
+        Position posPre = null;
+        Position posAfter = null;
+        for (Map.Entry<Consumer, Position> entry : 
recentlyJoinedConsumers.entrySet()) {
+            if (posPre == null) {
+                posPre = entry.getValue();
+            } else {
+                posAfter = entry.getValue();
+            }
+            if (posPre != null && posAfter != null) {
+                if (posPre.compareTo(posAfter) > 0) {
+                    sortNeeded = true;
+                    break;
+                }
+                posPre = posAfter;
+            }
+        }
+
+        if (sortNeeded) {
+            List<Map.Entry<Consumer, Position>> sortedList = new 
ArrayList<>(recentlyJoinedConsumers.entrySet());
+            Collections.sort(sortedList, Map.Entry.comparingByValue());
+            recentlyJoinedConsumers.clear();
+            for (Map.Entry<Consumer, Position> entry : sortedList) {
+                recentlyJoinedConsumers.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         // The consumer must be removed from the selector before calling the 
superclass removeConsumer method.
@@ -560,8 +593,11 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
                 && ksm.isAllowOutOfOrderDelivery() == 
this.allowOutOfOrderDelivery);
     }
 
-    public LinkedHashMap<Consumer, Position> getRecentlyJoinedConsumers() {
-        return recentlyJoinedConsumers;
+    public synchronized LinkedHashMap<Consumer, Position> 
getRecentlyJoinedConsumers() {
+        if (recentlyJoinedConsumers == null) {
+            return null;
+        }
+        return new LinkedHashMap<>(recentlyJoinedConsumers);
     }
 
     public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
index 1f40fd46aa3..af42fc3dca4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
@@ -45,7 +45,9 @@ import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -182,6 +184,85 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassicTest {
         
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
     }
 
+    @Test
+    public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception {
+        PersistentStickyKeyDispatcherMultipleConsumersClassic 
persistentDispatcher =
+                new PersistentStickyKeyDispatcherMultipleConsumersClassic(
+                topicMock, cursorMock, subscriptionMock, configMock,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
+
+        Consumer consumer0 = mock(Consumer.class);
+        when(consumer0.consumerName()).thenReturn("c0-1");
+        Consumer consumer1 = mock(Consumer.class);
+        when(consumer1.consumerName()).thenReturn("c1");
+        Consumer consumer2 = mock(Consumer.class);
+        when(consumer2.consumerName()).thenReturn("c2");
+        Consumer consumer3 = mock(Consumer.class);
+        when(consumer3.consumerName()).thenReturn("c3");
+        Consumer consumer4 = mock(Consumer.class);
+        when(consumer4.consumerName()).thenReturn("c4");
+        Consumer consumer5 = mock(Consumer.class);
+        when(consumer5.consumerName()).thenReturn("c5");
+        Consumer consumer6 = mock(Consumer.class);
+        when(consumer6.consumerName()).thenReturn("c6");
+
+        
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
+        
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1, 
-1));
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(0, 0));
+        persistentDispatcher.addConsumer(consumer0).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 1));
+        persistentDispatcher.addConsumer(consumer1).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 2));
+        persistentDispatcher.addConsumer(consumer2).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 1));
+        persistentDispatcher.addConsumer(consumer3).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 3));
+        persistentDispatcher.addConsumer(consumer4).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 2));
+        persistentDispatcher.addConsumer(consumer5).join();
+
+        
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(6, 1));
+        persistentDispatcher.addConsumer(consumer6).join();
+
+        assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 
6);
+
+        Iterator<Map.Entry<Consumer, Position>> itr
+                = 
persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
+
+        Map.Entry<Consumer, Position> entry1 = itr.next();
+        assertEquals(entry1.getValue(), PositionFactory.create(4, 1));
+        assertEquals(entry1.getKey(), consumer1);
+
+        Map.Entry<Consumer, Position> entry2 = itr.next();
+        assertEquals(entry2.getValue(), PositionFactory.create(4, 2));
+        assertEquals(entry2.getKey(), consumer5);
+
+        Map.Entry<Consumer, Position> entry3 = itr.next();
+        assertEquals(entry3.getValue(), PositionFactory.create(5, 1));
+        assertEquals(entry3.getKey(), consumer3);
+
+        Map.Entry<Consumer, Position> entry4 = itr.next();
+        assertEquals(entry4.getValue(), PositionFactory.create(5, 2));
+        assertEquals(entry4.getKey(), consumer2);
+
+        Map.Entry<Consumer, Position> entry5 = itr.next();
+        assertEquals(entry5.getValue(), PositionFactory.create(5, 3));
+        assertEquals(entry5.getKey(), consumer4);
+
+        Map.Entry<Consumer, Position> entry6 = itr.next();
+        assertEquals(entry6.getValue(), PositionFactory.create(6, 1));
+        assertEquals(entry6.getKey(), consumer6);
+
+        // cleanup.
+        persistentDispatcher.close();
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {

Reply via email to