This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b505a0f3ce5 Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed noisy error logs (#24634) b505a0f3ce5 is described below commit b505a0f3ce52d0117243390f1284ae42651019e5 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue Sep 9 19:17:02 2025 +0800 Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed noisy error logs (#24634) --- ...istentStickyKeyDispatcherMultipleConsumers.java | 42 ++++++++++---- ...ntStickyKeyDispatcherMultipleConsumersTest.java | 64 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 14af67b4573..da59e8c6df2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -85,11 +85,26 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi public boolean sortRecentlyJoinedConsumersIfNeeded = true; PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, - Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { + Subscription subscription, ServiceConfiguration conf, + KeySharedMeta ksm) { + this(topic, cursor, subscription, conf, ksm, null); + } + + /** + * @param recentlyJoinedConsumers This parameter is only used for testing. + */ + @VisibleForTesting + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, + Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm, + LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers) { super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); - this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); + if (recentlyJoinedConsumers == null) { + this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); + } else { + this.recentlyJoinedConsumers = recentlyJoinedConsumers; + } this.keySharedMode = ksm.getKeySharedMode(); switch (this.keySharedMode) { case AUTO_SPLIT: @@ -154,6 +169,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi }); } + /** + * Sort items in the collection "recentlyJoinedConsumers" if needed. + * Since we check the order of queue after each consumer joined, we can only check the last two items. + */ private void sortRecentlyJoinedConsumersIfNeeded() { if (!sortRecentlyJoinedConsumersIfNeeded) { return; @@ -161,20 +180,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi if (recentlyJoinedConsumers.size() == 1) { return; } - // Since we check the order of queue after each consumer joined, we can only check the last two items. boolean sortNeeded = false; - PositionImpl posPre = null; - PositionImpl posAfter = null; + PositionImpl secondLatest = null; + PositionImpl latest = null; for (Map.Entry<Consumer, PositionImpl> entry : recentlyJoinedConsumers.entrySet()) { - if (posPre == null) { - posPre = entry.getValue(); + if (secondLatest == null) { + secondLatest = entry.getValue(); + } else if (latest == null) { + latest = entry.getValue(); } else { - posPre = posAfter; - posAfter = entry.getValue(); + secondLatest = latest; + latest = entry.getValue(); } } - if (posPre != null && posAfter != null) { - if (posPre.compareTo(posAfter) > 0) { + if (secondLatest != null && latest != null) { + if (secondLatest.compareTo(latest) > 0) { sortNeeded = true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 9b7c98cc30e..20ffcff90ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +47,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -299,6 +301,68 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { persistentDispatcher.close(); } + @Test + public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws Exception { + // Inject a sorting counter. + LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = new LinkedHashMap<>(); + LinkedHashMap<Consumer, PositionImpl> spyRecentlyJoinedConsumers = spy(recentlyJoinedConsumers); + AtomicInteger sortTimes = new AtomicInteger(0); + doAnswer(invocationOnMock -> { + sortTimes.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(spyRecentlyJoinedConsumers).clear(); + + PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = + new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), spyRecentlyJoinedConsumers); + + Consumer consumer0 = createMockConsumer(); + when(consumer0.consumerName()).thenReturn("0"); + Consumer consumer1 = createMockConsumer(); + when(consumer0.consumerName()).thenReturn("MzGG2"); + Consumer consumer2 = createMockConsumer(); + when(consumer1.consumerName()).thenReturn("rMOYG"); + Consumer consumer3 = createMockConsumer(); + when(consumer2.consumerName()).thenReturn("QIleA"); + + when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L); + when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1)); + persistentDispatcher.addConsumer(consumer0).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1)); + persistentDispatcher.addConsumer(consumer1).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1)); + persistentDispatcher.addConsumer(consumer2).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 2)); + persistentDispatcher.addConsumer(consumer3).join(); + + assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 3); + + Iterator<Map.Entry<Consumer, PositionImpl>> itr = + persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator(); + + Map.Entry<Consumer, PositionImpl> entry1 = itr.next(); + assertEquals(entry1.getValue(), PositionImpl.get(1, 1)); + assertEquals(entry1.getKey(), consumer1); + + Map.Entry<Consumer, PositionImpl> entry2 = itr.next(); + assertEquals(entry2.getValue(), PositionImpl.get(1, 1)); + assertEquals(entry2.getKey(), consumer2); + + Map.Entry<Consumer, PositionImpl> entry3 = itr.next(); + assertEquals(entry3.getValue(), PositionImpl.get(1, 2)); + assertEquals(entry3.getKey(), consumer3); + + // Verify: no sorting was executed + assertEquals(sortTimes.get(), 0); + + // cleanup. + persistentDispatcher.close(); + } + @Test public void testSendMarkerMessage() { try {