This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0517423b0a8 [fix][broker] Fix dispatch duplicated messages with
`Exclusive` mode. (#17237)
0517423b0a8 is described below
commit 0517423b0a8d9c981cc5550abfec9e60b55bf3e7
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Aug 24 19:50:06 2022 +0800
[fix][broker] Fix dispatch duplicated messages with `Exclusive` mode.
(#17237)
---
.../PersistentDispatcherSingleActiveConsumer.java | 71 ++++++++++--------
...entStreamingDispatcherSingleActiveConsumer.java | 54 +++++++++-----
.../SubscriptionMessageDispatchThrottlingTest.java | 86 ++++++++++++++++++++++
3 files changed, 161 insertions(+), 50 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 608b0fa503f..c9d810357c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcherSingleActiveConsumer
implements Dispatcher, ReadEntriesCallback {
+ private final AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
protected final PersistentTopic topic;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
@@ -242,14 +244,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
SafeRun.safeRun(() -> {
synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
- if (newConsumer != null && !havePendingRead) {
- readMoreEntries(newConsumer);
- } else {
- log.debug(
- "[{}-{}] Ignoring write future
complete."
- + " consumerAvailable={}
havePendingRead={}",
- name, newConsumer, newConsumer !=
null, havePendingRead);
- }
+ readMoreEntries(newConsumer);
}
}));
}
@@ -336,25 +331,40 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
// consumer can be null when all consumers are disconnected from
broker.
// so skip reading more entries if currently there is no active
consumer.
if (null == consumer) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to the
current consumer is null", topic.getName());
+ }
+ return;
+ }
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have
pending read.", topic.getName());
+ }
return;
}
if (consumer.getAvailablePermits() > 0) {
- Pair<Integer, Long> calculateResult = calculateToRead(consumer);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
+ synchronized (this) {
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we
have pending read.", topic.getName());
+ }
+ return;
+ }
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch rate.
- return;
- }
+ Pair<Integer, Long> calculateResult =
calculateToRead(consumer);
+ int messagesToRead = calculateResult.getLeft();
+ long bytesToRead = calculateResult.getRight();
- // Schedule read
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
- }
+ if (-1 == messagesToRead || bytesToRead == -1) {
+ // Skip read as topic/dispatcher has exceed the dispatch
rate.
+ return;
+ }
- synchronized (this) {
+ // Schedule read
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
+ }
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
@@ -375,19 +385,16 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
protected void reScheduleRead() {
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry for topic: Current
Consumer {},"
- + " havePendingRead {}",
- topic.getName(), currentConsumer, havePendingRead);
- }
+ if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Reschedule message read in {} ms",
topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-
+ topic.getBrokerService().executor().schedule(() -> {
+ isRescheduleReadInProgress.set(false);
+ Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ readMoreEntries(currentConsumer);
+ }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
}
protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 01ef7216d20..2048bb016b8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -179,31 +179,49 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
// consumer can be null when all consumers are disconnected from
broker.
// so skip reading more entries if currently there is no active
consumer.
if (null == consumer) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to the
current consumer is null", topic.getName());
+ }
+ return;
+ }
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we have
pending read.", topic.getName());
+ }
return;
}
- if (!havePendingRead && consumer.getAvailablePermits() > 0) {
- Pair<Integer, Long> calculateResult = calculateToRead(consumer);
- int messagesToRead = calculateResult.getLeft();
- long bytesToRead = calculateResult.getRight();
+ if (consumer.getAvailablePermits() > 0) {
+ synchronized (this) {
+ if (havePendingRead) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read for the topic, Due to we
have pending read.", topic.getName());
+ }
+ return;
+ }
+
+ Pair<Integer, Long> calculateResult =
calculateToRead(consumer);
+ int messagesToRead = calculateResult.getLeft();
+ long bytesToRead = calculateResult.getRight();
- if (-1 == messagesToRead || bytesToRead == -1) {
- // Skip read as topic/dispatcher has exceed the dispatch rate.
- return;
- }
+ if (-1 == messagesToRead || bytesToRead == -1) {
+ // Skip read as topic/dispatcher has exceed the dispatch
rate.
+ return;
+ }
- // Schedule read
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
- }
- havePendingRead = true;
+ // Schedule read
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
+ }
+ havePendingRead = true;
- if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
- this, consumer);
- } else {
- streamingEntryReader.asyncReadEntries(messagesToRead,
bytesToRead, consumer);
+ if (consumer.readCompacted()) {
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
+ this, consumer);
+ } else {
+ streamingEntryReader.asyncReadEntries(messagesToRead,
bytesToRead, consumer);
+ }
}
} else {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 9e65eecb8ee..3d1060a406c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -220,6 +220,92 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
log.info("-- Exiting {} test --", methodName);
}
+ @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount =
15)
+ private void testMessageNotDuplicated(SubscriptionType subscription)
throws Exception {
+ int brokerRate = 1000;
+ int topicRate = 5000;
+ int subRate = 10000;
+ int expectRate = 1000;
+ final String namespace = "my-property/throttling_ns_non_dup";
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/throttlingAll");
+ final String subName = "my-subscriber-name-" + subscription;
+
+ DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(subRate)
+ .ratePeriodInSecond(1)
+ .build();
+ DispatchRate topicDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(topicRate)
+ .ratePeriodInSecond(1)
+ .build();
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setSubscriptionDispatchRate(namespace,
subscriptionDispatchRate);
+ admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" +
brokerRate);
+
+ final int numProducedMessages = 30;
+ final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+ // enable throttling for nonBacklog consumers
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ DispatchRateLimiter subRateLimiter = null;
+ Dispatcher subDispatcher =
topic.getSubscription(subName).getDispatcher();
+ if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else if (subDispatcher instanceof
PersistentDispatcherSingleActiveConsumer) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else {
+ Assert.fail("Should only have PersistentDispatcher in this test");
+ }
+ final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+ Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ Assert.assertTrue(brokerDispatchRateLimiter != null
+ && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ DispatchRateLimiter topicDispatchRateLimiter =
topic.getDispatchRateLimiter().orElse(null);
+ Assert.assertTrue(topicDispatchRateLimiter != null
+ && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ Assert.assertTrue(subDispatchRateLimiter != null
+ && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ });
+
+
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), subRate);
+ Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), topicRate);
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ producer.send(new byte[expectRate / 10]);
+ }
+
+ latch.await();
+ // Wait 2000 milli sec to check if we can get more than 30 messages.
+ Thread.sleep(2000);
+ // If this assertion failed, please alert we may have some regression
cause message dispatch was duplicated.
+ Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, true);
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
/**
* verify rate-limiting should throttle message-dispatching based on
byte-rate
*