This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7f6972766f1 [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher) (#17163) 7f6972766f1 is described below commit 7f6972766f1f8a989f477cbb3085d0660be53bd8 Author: Nicolò Boschi <boschi1...@gmail.com> AuthorDate: Sat Aug 20 09:03:09 2022 +0200 [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher) (#17163) * [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions * move test to broker-api --- .../persistent/PersistentStreamingDispatcherMultipleConsumers.java | 5 +++++ .../SimpleProducerConsumerTestStreamingDispatcherTest.java | 2 +- .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 3cbc43cfdfe..26647dfc1d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -155,6 +155,11 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi @Override public synchronized void readMoreEntries() { + if (sendInProgress) { + // we cannot read more entries while sending the previous batch + // otherwise we could re-read the same entries and send duplicates + return; + } // totalAvailablePermits may be updated by other threads int currentTotalAvailablePermits = totalAvailablePermits; if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java index 7ed7981c937..08b680b61b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Test; /** * SimpleProducerConsumerTest with {@link StreamingDispatcher} */ -@Test(groups = "flaky") +@Test(groups = "broker-api") public class SimpleProducerConsumerTestStreamingDispatcherTest extends SimpleProducerConsumerTest { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2f572a841b0..e19b712a85b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -113,7 +113,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") +@Test(groups = "broker-api") public class SimpleProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); private static final int TIMEOUT_MULTIPLIER = Integer.getInteger("SimpleProducerConsumerTest.receive.timeout.multiplier", 1);