This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6972766f1 [fix][broker] Avoid messages being repeatedly replayed
with SHARED subscriptions (streaming dispatcher) (#17163)
7f6972766f1 is described below
commit 7f6972766f1f8a989f477cbb3085d0660be53bd8
Author: Nicolò Boschi <[email protected]>
AuthorDate: Sat Aug 20 09:03:09 2022 +0200
[fix][broker] Avoid messages being repeatedly replayed with SHARED
subscriptions (streaming dispatcher) (#17163)
* [fix][broker] Avoid messages being repeatedly replayed with SHARED
subscriptions
* move test to broker-api
---
.../persistent/PersistentStreamingDispatcherMultipleConsumers.java | 5 +++++
.../SimpleProducerConsumerTestStreamingDispatcherTest.java | 2 +-
.../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java | 2 +-
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 3cbc43cfdfe..26647dfc1d5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -155,6 +155,11 @@ public class
PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
@Override
public synchronized void readMoreEntries() {
+ if (sendInProgress) {
+ // we cannot read more entries while sending the previous batch
+ // otherwise we could re-read the same entries and send duplicates
+ return;
+ }
// totalAvailablePermits may be updated by other threads
int currentTotalAvailablePermits = totalAvailablePermits;
if (currentTotalAvailablePermits > 0 &&
isAtleastOneConsumerAvailable()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
index 7ed7981c937..08b680b61b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java
@@ -25,7 +25,7 @@ import org.testng.annotations.Test;
/**
* SimpleProducerConsumerTest with {@link StreamingDispatcher}
*/
-@Test(groups = "flaky")
+@Test(groups = "broker-api")
public class SimpleProducerConsumerTestStreamingDispatcherTest extends
SimpleProducerConsumerTest {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f572a841b0..e19b712a85b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -113,7 +113,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
+@Test(groups = "broker-api")
public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
private static final int TIMEOUT_MULTIPLIER =
Integer.getInteger("SimpleProducerConsumerTest.receive.timeout.multiplier", 1);