This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 70931966000 Issue 16802: fix Repeated messages of shared dispatcher 
(#16812)
70931966000 is described below

commit 709319660006fe5dcc48e8e1212d7929901c09be
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Jul 28 21:03:43 2022 +0200

    Issue 16802: fix Repeated messages of shared dispatcher (#16812)
    
    (cherry picked from commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5)
---
 .../PersistentDispatcherMultipleConsumers.java     | 40 +++++++++---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++--
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +++++++++++++---------
 3 files changed, 81 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7d626fa6a61..91a155a9157 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -86,7 +86,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-
+    protected boolean sendInProgress;
     protected static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -232,6 +232,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     public synchronized void readMoreEntries() {
+        if (sendInProgress) {
+            // we cannot read more entries while sending the previous batch
+            // otherwise we could re-read the same entries and send duplicates
+            return;
+        }
         if (shouldPauseDeliveryForDelayTracker()) {
             return;
         }
@@ -489,7 +494,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     @Override
-    public synchronized void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+    public final synchronized void readEntriesComplete(List<Entry> entries, 
Object ctx) {
         ReadType readType = (ReadType) ctx;
         if (readType == ReadType.Normal) {
             havePendingRead = false;
@@ -524,8 +529,26 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         sendMessagesToConsumers(readType, entries);
     }
 
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
+    protected final synchronized void sendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
+        sendInProgress = true;
+        boolean readMoreEntries;
+        try {
+            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+        } finally {
+            sendInProgress = false;
+        }
+        if (readMoreEntries) {
+            readMoreEntries();
+        }
+    }
 
+    /**
+     * Dispatch the messages to the Consumers.
+     * @return true if you want to trigger a new read.
+     * This method is overridden by other classes, please take a look to other 
implementations
+     * if you need to change it.
+     */
+    protected synchronized boolean trySendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
@@ -533,8 +556,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         int entriesToDispatch = entries.size();
         // Trigger read more messages
         if (entriesToDispatch == 0) {
-            readMoreEntries();
-            return;
+            return true;
         }
         EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
         int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers, 
entries);
@@ -559,7 +581,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 log.info("[{}] rewind because no available consumer found from 
total {}", name, consumerList.size());
                 entries.subList(start, entries.size()).forEach(Entry::release);
                 cursor.rewind();
-                return;
+                return false;
             }
 
             // round-robin dispatch batch size for this consumer
@@ -606,7 +628,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 entriesToDispatch -= messagesForC;
                 TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                         -(msgSent - 
batchIndexesAcks.getTotalAckedIndexCount()));
-                if (log.isDebugEnabled()){
+                if (log.isDebugEnabled()) {
                     log.debug("[{}] Added -({} minus {}) permits to 
TOTAL_AVAILABLE_PERMITS_UPDATER in "
                                     + "PersistentDispatcherMultipleConsumers",
                             name, msgSent, 
batchIndexesAcks.getTotalAckedIndexCount());
@@ -649,9 +671,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 entry.release();
             });
         }
-        // We should not call readMoreEntries() recursively in the same thread
-        // as there is a risk of StackOverflowError
-        topic.getBrokerService().executor().execute(this::readMoreEntries);
+        return true;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5be832934db..a6032a12c3a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -148,7 +148,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             };
 
     @Override
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
+    protected synchronized boolean trySendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
@@ -156,14 +156,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         // Trigger read more messages
         if (entriesCount == 0) {
-            readMoreEntries();
-            return;
+            return true;
         }
 
         if (consumerSet.isEmpty()) {
             entries.forEach(Entry::release);
             cursor.rewind();
-            return;
+            return false;
         }
 
         // A corner case that we have to retry a readMoreEntries in order to 
preserver order delivery.
@@ -198,8 +197,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                         } else if (readType == ReadType.Replay) {
                             entries.forEach(Entry::release);
                         }
-                        readMoreEntries();
-                        return;
+                        return true;
                     }
                 }
             }
@@ -322,14 +320,17 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             isDispatcherStuckOnReplays = true;
             // readMoreEntries should run regardless whether or not stuck is 
caused by
             // stuckConsumers for avoid stopping dispatch.
+            sendInProgress = false;
             topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
         }  else if (currentThreadKeyNumber == 0) {
+            sendInProgress = false;
             topic.getBrokerService().executor().schedule(() -> {
                 synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
                     readMoreEntries();
                 }
             }, 100, TimeUnit.MILLISECONDS);
         }
+        return false;
     }
 
     private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac4..ede9baa8fff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -49,6 +49,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import io.netty.channel.EventLoopGroup;
@@ -71,6 +72,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 import org.testng.Assert;
@@ -101,6 +103,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
         
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
         
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
+        
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
 
         pulsarMock = mock(PulsarService.class);
         doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -114,7 +117,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         doReturn(eventLoopGroup).when(brokerMock).executor();
         doAnswer(invocation -> {
-            ((Runnable)invocation.getArguments()[0]).run();
+            orderedExecutor.execute(((Runnable)invocation.getArguments()[0]));
             return null;
         }).when(eventLoopGroup).execute(any(Runnable.class));
 
@@ -180,19 +183,21 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
             fail("Failed to readEntriesComplete.", e);
         }
 
-        ArgumentCaptor<Integer> totalMessagesCaptor = 
ArgumentCaptor.forClass(Integer.class);
-        verify(consumerMock, times(1)).sendMessages(
-                anyList(),
-                any(EntryBatchSizes.class),
-                any(EntryBatchIndexesAcks.class),
-                totalMessagesCaptor.capture(),
-                anyLong(),
-                anyLong(),
-                any(RedeliveryTracker.class)
-        );
-
-        List<Integer> allTotalMessagesCaptor = 
totalMessagesCaptor.getAllValues();
-        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+        Awaitility.await().untilAsserted(() -> {
+                    ArgumentCaptor<Integer> totalMessagesCaptor = 
ArgumentCaptor.forClass(Integer.class);
+                    verify(consumerMock, times(1)).sendMessages(
+                            anyList(),
+                            any(EntryBatchSizes.class),
+                            any(EntryBatchIndexesAcks.class),
+                            totalMessagesCaptor.capture(),
+                            anyLong(),
+                            anyLong(),
+                            any(RedeliveryTracker.class)
+                    );
+
+                    List<Integer> allTotalMessagesCaptor = 
totalMessagesCaptor.getAllValues();
+                    
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
+                });
     }
 
     @Test(timeOut = 10000)
@@ -292,21 +297,23 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         // and then stop to dispatch to slowConsumer
         
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal,
 redeliverEntries);
 
-        verify(consumerMock, times(1)).sendMessages(
-                argThat(arg -> {
-                    assertEquals(arg.size(), 1);
-                    Entry entry = arg.get(0);
-                    assertEquals(entry.getLedgerId(), 1);
-                    assertEquals(entry.getEntryId(), 3);
-                    return true;
-                }),
-                any(EntryBatchSizes.class),
-                any(EntryBatchIndexesAcks.class),
-                anyInt(),
-                anyLong(),
-                anyLong(),
-                any(RedeliveryTracker.class)
-        );
+        Awaitility.await().untilAsserted(() -> {
+            verify(consumerMock, times(1)).sendMessages(
+                    argThat(arg -> {
+                        assertEquals(arg.size(), 1);
+                        Entry entry = arg.get(0);
+                        assertEquals(entry.getLedgerId(), 1);
+                        assertEquals(entry.getEntryId(), 3);
+                        return true;
+                    }),
+                    any(EntryBatchSizes.class),
+                    any(EntryBatchIndexesAcks.class),
+                    anyInt(),
+                    anyLong(),
+                    anyLong(),
+                    any(RedeliveryTracker.class)
+            );
+        });
         verify(slowConsumerMock, times(0)).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
@@ -408,9 +415,15 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
                 
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), 
anyBoolean());
 
         // Mock Cursor#asyncReadEntriesOrWait
+        AtomicBoolean asyncReadEntriesOrWaitCalled = new AtomicBoolean();
         doAnswer(invocationOnMock -> {
-            ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(2))
-                    .readEntriesComplete(readEntries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            if (asyncReadEntriesOrWaitCalled.compareAndSet(false, true)) {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(2))
+                        .readEntriesComplete(readEntries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            } else {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(2))
+                        .readEntriesComplete(Collections.emptyList(), 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+            }
             return null;
         }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(),
                 any(PersistentStickyKeyDispatcherMultipleConsumers.class),

Reply via email to