This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new c2940b3c13c [fix][broker]Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed noisy error logs (#24634) c2940b3c13c is described below commit c2940b3c13cef4d935655d9a7b117a43471bbed1 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue Sep 9 19:17:02 2025 +0800 [fix][broker]Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed noisy error logs (#24634) --- ...tickyKeyDispatcherMultipleConsumersClassic.java | 41 ++++++++++---- ...yKeyDispatcherMultipleConsumersClassicTest.java | 64 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 56161d8dd15..c3b246fe9ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -58,6 +58,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.util.FutureUtil; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,10 +96,25 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { + this(topic, cursor, subscription, conf, ksm, null); + } + + /** + * @param recentlyJoinedConsumers This parameter is only used for testing. + */ + @VisibleForTesting + PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic topic, ManagedCursor cursor, + Subscription subscription, ServiceConfiguration conf, + KeySharedMeta ksm, + @Nullable LinkedHashMap<Consumer, Position> recentlyJoinedConsumers) { super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); - this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); + if (recentlyJoinedConsumers == null) { + this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); + } else { + this.recentlyJoinedConsumers = recentlyJoinedConsumers; + } this.keySharedMode = ksm.getKeySharedMode(); switch (this.keySharedMode) { case AUTO_SPLIT: @@ -166,6 +182,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic }); } + /** + * Sort items in the collection "recentlyJoinedConsumers" if needed. + * Since we check the order of queue after each consumer joined, we can only check the last two items. + */ private void sortRecentlyJoinedConsumersIfNeeded() { if (!sortRecentlyJoinedConsumersIfNeeded) { return; @@ -173,20 +193,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic if (recentlyJoinedConsumers.size() == 1) { return; } - // Since we check the order of queue after each consumer joined, we can only check the last two items. boolean sortNeeded = false; - Position posPre = null; - Position posAfter = null; + Position secondLatest = null; + Position latest = null; for (Map.Entry<Consumer, Position> entry : recentlyJoinedConsumers.entrySet()) { - if (posPre == null) { - posPre = entry.getValue(); + if (secondLatest == null) { + secondLatest = entry.getValue(); + } else if (latest == null) { + latest = entry.getValue(); } else { - posPre = posAfter; - posAfter = entry.getValue(); + secondLatest = latest; + latest = entry.getValue(); } } - if (posPre != null && posAfter != null) { - if (posPre.compareTo(posAfter) > 0) { + if (secondLatest != null && latest != null) { + if (secondLatest.compareTo(latest) > 0) { sortNeeded = true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java index fd87ead2017..1ef74915418 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +47,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -299,6 +301,68 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassicTest { persistentDispatcher.close(); } + @Test + public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws Exception { + // Inject a sorting counter. + LinkedHashMap<Consumer, Position> recentlyJoinedConsumers = new LinkedHashMap<>(); + LinkedHashMap<Consumer, Position> spyRecentlyJoinedConsumers = spy(recentlyJoinedConsumers); + AtomicInteger sortTimes = new AtomicInteger(0); + doAnswer(invocationOnMock -> { + sortTimes.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(spyRecentlyJoinedConsumers).clear(); + + PersistentStickyKeyDispatcherMultipleConsumersClassic persistentDispatcher = + new PersistentStickyKeyDispatcherMultipleConsumersClassic( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), spyRecentlyJoinedConsumers); + + Consumer consumer0 = createMockConsumer(); + when(consumer0.consumerName()).thenReturn("0"); + Consumer consumer1 = createMockConsumer(); + when(consumer0.consumerName()).thenReturn("MzGG2"); + Consumer consumer2 = createMockConsumer(); + when(consumer1.consumerName()).thenReturn("rMOYG"); + Consumer consumer3 = createMockConsumer(); + when(consumer2.consumerName()).thenReturn("QIleA"); + + when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L); + when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1, -1)); + persistentDispatcher.addConsumer(consumer0).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1)); + persistentDispatcher.addConsumer(consumer1).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1)); + persistentDispatcher.addConsumer(consumer2).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 2)); + persistentDispatcher.addConsumer(consumer3).join(); + + assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 3); + + Iterator<Map.Entry<Consumer, Position>> itr = + persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator(); + + Map.Entry<Consumer, Position> entry1 = itr.next(); + assertEquals(entry1.getValue(), PositionFactory.create(1, 1)); + assertEquals(entry1.getKey(), consumer1); + + Map.Entry<Consumer, Position> entry2 = itr.next(); + assertEquals(entry2.getValue(), PositionFactory.create(1, 1)); + assertEquals(entry2.getKey(), consumer2); + + Map.Entry<Consumer, Position> entry3 = itr.next(); + assertEquals(entry3.getValue(), PositionFactory.create(1, 2)); + assertEquals(entry3.getKey(), consumer3); + + // Verify: no sorting was executed + assertEquals(sortTimes.get(), 0); + + // cleanup. + persistentDispatcher.close(); + } + @Test public void testSendMarkerMessage() { try {