This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 825b68db7be Issue 16802: fix Repeated messages of shared dispatcher (#16812) 825b68db7be is described below commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5 Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Thu Jul 28 21:03:43 2022 +0200 Issue 16802: fix Repeated messages of shared dispatcher (#16812) --- .../PersistentDispatcherMultipleConsumers.java | 47 ++++++++++---- ...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++-- .../service/persistent/DelayedDeliveryTest.java | 1 - ...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +++++++++++++--------- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 71faeb7adba..cf58bfd43ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; - + protected boolean sendInProgress; protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, @@ -240,6 +240,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } public synchronized void readMoreEntries() { + if (sendInProgress) { + // we cannot read more entries while sending the previous batch + // otherwise we could re-read the same entries and send duplicates + return; + } if (shouldPauseDeliveryForDelayTracker()) { return; } @@ -496,7 +501,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } @Override - public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) { + public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) { ReadType readType = (ReadType) ctx; if (readType == ReadType.Normal) { havePendingRead = false; @@ -528,18 +533,39 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } + // dispatch messages to a separate thread, but still in order for this subscription + // sendMessagesToConsumers is responsible for running broker-side filters + // that may be quite expensive if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) { - // dispatch messages to a separate thread, but still in order for this subscription - // sendMessagesToConsumers is responsible for running broker-side filters - // that may be quite expensive + // setting sendInProgress here, because sendMessagesToConsumers will be executed + // in a separate thread, and we want to prevent more reads + sendInProgress = true; dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries))); } else { sendMessagesToConsumers(readType, entries); } } - protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { + protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { + sendInProgress = true; + boolean readMoreEntries; + try { + readMoreEntries = trySendMessagesToConsumers(readType, entries); + } finally { + sendInProgress = false; + } + if (readMoreEntries) { + readMoreEntries(); + } + } + /** + * Dispatch the messages to the Consumers. + * @return true if you want to trigger a new read. + * This method is overridden by other classes, please take a look to other implementations + * if you need to change it. + */ + protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } @@ -547,8 +573,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul int entriesToDispatch = entries.size(); // Trigger read more messages if (entriesToDispatch == 0) { - readMoreEntries(); - return; + return true; } final MessageMetadata[] metadataArray = entries.stream() .map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1)) @@ -578,7 +603,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size()); entries.subList(start, entries.size()).forEach(Entry::release); cursor.rewind(); - return; + return false; } // round-robin dispatch batch size for this consumer @@ -623,7 +648,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul entriesToDispatch -= messagesForC; TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount())); - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in " + "PersistentDispatcherMultipleConsumers", name, msgSent, batchIndexesAcks.getTotalAckedIndexCount()); @@ -658,7 +683,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul entry.release(); }); } - readMoreEntries(); + return true; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 558e3f129ce..e42995e9247 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -152,7 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi }; @Override - protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { + protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) { long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; @@ -160,14 +160,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // Trigger read more messages if (entriesCount == 0) { - readMoreEntries(); - return; + return true; } if (consumerSet.isEmpty()) { entries.forEach(Entry::release); cursor.rewind(); - return; + return false; } // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. @@ -201,8 +200,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } else if (readType == ReadType.Replay) { entries.forEach(Entry::release); } - readMoreEntries(); - return; + return true; } } } @@ -331,14 +329,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } // readMoreEntries should run regardless whether or not stuck is caused by // stuckConsumers for avoid stopping dispatch. + sendInProgress = false; topic.getBrokerService().executor().execute(() -> readMoreEntries()); } else if (currentThreadKeyNumber == 0) { + sendInProgress = false; topic.getBrokerService().executor().schedule(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { readMoreEntries(); } }, 100, TimeUnit.MILLISECONDS); } + return false; } private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index aa787907329..8b62845572d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -278,7 +278,6 @@ public class DelayedDeliveryTest extends ProducerConsumerBase { for (int i = 0; i < N; i++) { msg = consumer.receive(10, TimeUnit.SECONDS); receivedMsgs.add(msg.getValue()); - consumer.acknowledge(msg); } assertEquals(receivedMsgs.size(), N); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 72286b01c76..aa87b2aaa25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import io.netty.channel.EventLoopGroup; @@ -69,6 +70,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -99,6 +101,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); + doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -115,7 +118,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); doReturn(eventLoopGroup).when(brokerMock).executor(); doAnswer(invocation -> { - ((Runnable)invocation.getArguments()[0]).run(); + orderedExecutor.execute(((Runnable)invocation.getArguments()[0])); return null; }).when(eventLoopGroup).execute(any(Runnable.class)); @@ -180,19 +183,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { fail("Failed to readEntriesComplete.", e); } - ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class); - verify(consumerMock, times(1)).sendMessages( - anyList(), - any(EntryBatchSizes.class), - any(EntryBatchIndexesAcks.class), - totalMessagesCaptor.capture(), - anyLong(), - anyLong(), - any(RedeliveryTracker.class) - ); - - List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues(); - Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); + Awaitility.await().untilAsserted(() -> { + ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class); + verify(consumerMock, times(1)).sendMessages( + anyList(), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + totalMessagesCaptor.capture(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + + List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues(); + Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); + }); } @Test(timeOut = 10000) @@ -283,21 +288,23 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { // and then stop to dispatch to slowConsumer persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries); - verify(consumerMock, times(1)).sendMessages( - argThat(arg -> { - assertEquals(arg.size(), 1); - Entry entry = arg.get(0); - assertEquals(entry.getLedgerId(), 1); - assertEquals(entry.getEntryId(), 3); - return true; - }), - any(EntryBatchSizes.class), - any(EntryBatchIndexesAcks.class), - anyInt(), - anyLong(), - anyLong(), - any(RedeliveryTracker.class) - ); + Awaitility.await().untilAsserted(() -> { + verify(consumerMock, times(1)).sendMessages( + argThat(arg -> { + assertEquals(arg.size(), 1); + Entry entry = arg.get(0); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), 3); + return true; + }), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); + }); verify(slowConsumerMock, times(0)).sendMessages( anyList(), any(EntryBatchSizes.class), @@ -399,9 +406,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); // Mock Cursor#asyncReadEntriesOrWait + AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean(); doAnswer(invocationOnMock -> { - ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) - .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + } else { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(Collections.emptyList(), PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + } return null; }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class),