This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6972766f1 [fix][broker] Avoid messages being repeatedly replayed 
with SHARED subscriptions (streaming dispatcher)  (#17163)
7f6972766f1 is described below

commit 7f6972766f1f8a989f477cbb3085d0660be53bd8
Author: Nicolò Boschi <boschi1...@gmail.com>
AuthorDate: Sat Aug 20 09:03:09 2022 +0200

    [fix][broker] Avoid messages being repeatedly replayed with SHARED 
subscriptions (streaming dispatcher)  (#17163)
    
    * [fix][broker] Avoid messages being repeatedly replayed with SHARED 
subscriptions
    
    * move test to broker-api
---
 .../persistent/PersistentStreamingDispatcherMultipleConsumers.java   | 5 +++++
 .../SimpleProducerConsumerTestStreamingDispatcherTest.java           | 2 +-
 .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java     | 2 +-
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 3cbc43cfdfe..26647dfc1d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -155,6 +155,11 @@ public class 
PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     public synchronized void readMoreEntries() {
+        if (sendInProgress) {
+            // we cannot read more entries while sending the previous batch
+            // otherwise we could re-read the same entries and send duplicates
+            return;
+        }
         // totalAvailablePermits may be updated by other threads
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
index 7ed7981c937..08b680b61b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
@@ -25,7 +25,7 @@ import org.testng.annotations.Test;
 /**
  * SimpleProducerConsumerTest with {@link StreamingDispatcher}
  */
-@Test(groups = "flaky")
+@Test(groups = "broker-api")
 public class SimpleProducerConsumerTestStreamingDispatcherTest extends 
SimpleProducerConsumerTest {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f572a841b0..e19b712a85b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -113,7 +113,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
+@Test(groups = "broker-api")
 public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
     private static final int TIMEOUT_MULTIPLIER = 
Integer.getInteger("SimpleProducerConsumerTest.receive.timeout.multiplier", 1);

Reply via email to