This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 70931966000 Issue 16802: fix Repeated messages of shared dispatcher
(#16812)
70931966000 is described below
commit 709319660006fe5dcc48e8e1212d7929901c09be
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Jul 28 21:03:43 2022 +0200
Issue 16802: fix Repeated messages of shared dispatcher (#16812)
(cherry picked from commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5)
---
.../PersistentDispatcherMultipleConsumers.java | 40 +++++++++---
...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++--
...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +++++++++++++---------
3 files changed, 81 insertions(+), 47 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7d626fa6a61..91a155a9157 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -86,7 +86,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
-
+ protected boolean sendInProgress;
protected static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -232,6 +232,11 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
public synchronized void readMoreEntries() {
+ if (sendInProgress) {
+ // we cannot read more entries while sending the previous batch
+ // otherwise we could re-read the same entries and send duplicates
+ return;
+ }
if (shouldPauseDeliveryForDelayTracker()) {
return;
}
@@ -489,7 +494,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
@Override
- public synchronized void readEntriesComplete(List<Entry> entries, Object
ctx) {
+ public final synchronized void readEntriesComplete(List<Entry> entries,
Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
havePendingRead = false;
@@ -524,8 +529,26 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
sendMessagesToConsumers(readType, entries);
}
- protected void sendMessagesToConsumers(ReadType readType, List<Entry>
entries) {
+ protected final synchronized void sendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
+ sendInProgress = true;
+ boolean readMoreEntries;
+ try {
+ readMoreEntries = trySendMessagesToConsumers(readType, entries);
+ } finally {
+ sendInProgress = false;
+ }
+ if (readMoreEntries) {
+ readMoreEntries();
+ }
+ }
+ /**
+ * Dispatch the messages to the Consumers.
+ * @return true if you want to trigger a new read.
+ * This method is overridden by other classes, please take a look to other
implementations
+ * if you need to change it.
+ */
+ protected synchronized boolean trySendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
@@ -533,8 +556,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
int entriesToDispatch = entries.size();
// Trigger read more messages
if (entriesToDispatch == 0) {
- readMoreEntries();
- return;
+ return true;
}
EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers,
entries);
@@ -559,7 +581,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
log.info("[{}] rewind because no available consumer found from
total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
- return;
+ return false;
}
// round-robin dispatch batch size for this consumer
@@ -606,7 +628,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent -
batchIndexesAcks.getTotalAckedIndexCount()));
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("[{}] Added -({} minus {}) permits to
TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ "PersistentDispatcherMultipleConsumers",
name, msgSent,
batchIndexesAcks.getTotalAckedIndexCount());
@@ -649,9 +671,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
entry.release();
});
}
- // We should not call readMoreEntries() recursively in the same thread
- // as there is a risk of StackOverflowError
- topic.getBrokerService().executor().execute(this::readMoreEntries);
+ return true;
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5be832934db..a6032a12c3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -148,7 +148,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
};
@Override
- protected void sendMessagesToConsumers(ReadType readType, List<Entry>
entries) {
+ protected synchronized boolean trySendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
@@ -156,14 +156,13 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// Trigger read more messages
if (entriesCount == 0) {
- readMoreEntries();
- return;
+ return true;
}
if (consumerSet.isEmpty()) {
entries.forEach(Entry::release);
cursor.rewind();
- return;
+ return false;
}
// A corner case that we have to retry a readMoreEntries in order to
preserver order delivery.
@@ -198,8 +197,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
- readMoreEntries();
- return;
+ return true;
}
}
}
@@ -322,14 +320,17 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
isDispatcherStuckOnReplays = true;
// readMoreEntries should run regardless whether or not stuck is
caused by
// stuckConsumers for avoid stopping dispatch.
+ sendInProgress = false;
topic.getBrokerService().executor().execute(() ->
readMoreEntries());
} else if (currentThreadKeyNumber == 0) {
+ sendInProgress = false;
topic.getBrokerService().executor().schedule(() -> {
synchronized
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
}, 100, TimeUnit.MILLISECONDS);
}
+ return false;
}
private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac4..ede9baa8fff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -49,6 +49,7 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.channel.EventLoopGroup;
@@ -71,6 +72,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.testng.Assert;
@@ -101,6 +103,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
+
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -114,7 +117,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
doReturn(eventLoopGroup).when(brokerMock).executor();
doAnswer(invocation -> {
- ((Runnable)invocation.getArguments()[0]).run();
+ orderedExecutor.execute(((Runnable)invocation.getArguments()[0]));
return null;
}).when(eventLoopGroup).execute(any(Runnable.class));
@@ -180,19 +183,21 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
fail("Failed to readEntriesComplete.", e);
}
- ArgumentCaptor<Integer> totalMessagesCaptor =
ArgumentCaptor.forClass(Integer.class);
- verify(consumerMock, times(1)).sendMessages(
- anyList(),
- any(EntryBatchSizes.class),
- any(EntryBatchIndexesAcks.class),
- totalMessagesCaptor.capture(),
- anyLong(),
- anyLong(),
- any(RedeliveryTracker.class)
- );
-
- List<Integer> allTotalMessagesCaptor =
totalMessagesCaptor.getAllValues();
- Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+ Awaitility.await().untilAsserted(() -> {
+ ArgumentCaptor<Integer> totalMessagesCaptor =
ArgumentCaptor.forClass(Integer.class);
+ verify(consumerMock, times(1)).sendMessages(
+ anyList(),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ totalMessagesCaptor.capture(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+
+ List<Integer> allTotalMessagesCaptor =
totalMessagesCaptor.getAllValues();
+
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+ });
}
@Test(timeOut = 10000)
@@ -292,21 +297,23 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
// and then stop to dispatch to slowConsumer
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal,
redeliverEntries);
- verify(consumerMock, times(1)).sendMessages(
- argThat(arg -> {
- assertEquals(arg.size(), 1);
- Entry entry = arg.get(0);
- assertEquals(entry.getLedgerId(), 1);
- assertEquals(entry.getEntryId(), 3);
- return true;
- }),
- any(EntryBatchSizes.class),
- any(EntryBatchIndexesAcks.class),
- anyInt(),
- anyLong(),
- anyLong(),
- any(RedeliveryTracker.class)
- );
+ Awaitility.await().untilAsserted(() -> {
+ verify(consumerMock, times(1)).sendMessages(
+ argThat(arg -> {
+ assertEquals(arg.size(), 1);
+ Entry entry = arg.get(0);
+ assertEquals(entry.getLedgerId(), 1);
+ assertEquals(entry.getEntryId(), 3);
+ return true;
+ }),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+ });
verify(slowConsumerMock, times(0)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
@@ -408,9 +415,15 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay),
anyBoolean());
// Mock Cursor#asyncReadEntriesOrWait
+ AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean();
doAnswer(invocationOnMock -> {
- ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(2))
- .readEntriesComplete(readEntries,
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) {
+ ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(2))
+ .readEntriesComplete(readEntries,
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ } else {
+ ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(2))
+ .readEntriesComplete(Collections.emptyList(),
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ }
return null;
}).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
any(PersistentStickyKeyDispatcherMultipleConsumers.class),