This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0c6e809 [Branch-2.7] Fix broker dispatch byte rate limiter (#11249)
0c6e809 is described below
commit 0c6e809c1ba0d667dd1a0ef87f49eb5a38b631f6
Author: ran <[email protected]>
AuthorDate: Fri Jul 9 10:26:34 2021 +0800
[Branch-2.7] Fix broker dispatch byte rate limiter (#11249)
The PR #11135 couldn't be cherry-picked to branch-2.7, because there are
too many conflicts.
## Motivation
fix https://github.com/apache/pulsar/issues/11044
now dispatcher byte rate limit don't limit every cursor read. When cursor
read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It
will cause that dispatcher read entries by
`ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
## implement
when cursor read entries size need to calculate, the calculate result by
dispatcher bytes limiter.
---
.../broker/service/AbstractBaseDispatcher.java | 20 ++++++++++-
.../AbstractDispatcherMultipleConsumers.java | 5 +--
.../AbstractDispatcherSingleActiveConsumer.java | 5 +--
.../NonPersistentDispatcherMultipleConsumers.java | 5 +--
...onPersistentDispatcherSingleActiveConsumer.java | 6 ++--
.../service/persistent/DispatchRateLimiter.java | 9 +++++
.../PersistentDispatcherMultipleConsumers.java | 32 ++++++++---------
.../PersistentDispatcherSingleActiveConsumer.java | 34 +++++++++---------
.../client/api/MessageDispatchThrottlingTest.java | 41 ++++++++--------------
.../SubscriptionMessageDispatchThrottlingTest.java | 8 ++---
10 files changed, 89 insertions(+), 76 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 997ce5c..627e4e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -48,8 +49,11 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
protected final Subscription subscription;
- protected AbstractBaseDispatcher(Subscription subscription) {
+ protected final ServiceConfiguration serviceConfig;
+
+ protected AbstractBaseDispatcher(Subscription subscription,
ServiceConfiguration serviceConfig) {
this.subscription = subscription;
+ this.serviceConfig = serviceConfig;
}
/**
@@ -235,4 +239,18 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
}
});
}
+
+ protected static Pair<Integer, Long> computeReadLimits(int messagesToRead,
int availablePermitsOnMsg,
+ long bytesToRead,
long availablePermitsOnByte) {
+ if (availablePermitsOnMsg > 0) {
+ messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
+ }
+
+ if (availablePermitsOnByte > 0) {
+ bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
+ }
+
+ return Pair.of(messagesToRead, bytesToRead);
+ }
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 434ff2d..44162d9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -26,6 +26,7 @@ import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.slf4j.Logger;
@@ -47,8 +48,8 @@ public abstract class AbstractDispatcherMultipleConsumers
extends AbstractBaseDi
private Random random = new Random(42);
- protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
- super(subscription);
+ protected AbstractDispatcherMultipleConsumers(Subscription subscription,
ServiceConfiguration serviceConfig) {
+ super(subscription, serviceConfig);
}
public boolean isConsumerConnected() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 559ba90..a321b64 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -56,8 +57,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer
extends AbstractBas
private volatile int isClosed = FALSE;
public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType,
int partitionIndex,
- String topicName, Subscription subscription) {
- super(subscription);
+ String topicName, Subscription subscription, ServiceConfiguration
serviceConfig) {
+ super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2943636..3bf1f9d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers
extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;
- private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;
public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic,
Subscription subscription) {
- super(subscription);
+ super(subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
- this.serviceConfig =
topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker =
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 1b51e0d..69e9a95 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -25,7 +25,6 @@ import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
@@ -47,16 +46,15 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
- private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;
public NonPersistentDispatcherSingleActiveConsumer(SubType
subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
- super(subscriptionType, partitionIndex, topic.getName(), subscription);
+ super(subscriptionType, partitionIndex, topic.getName(), subscription,
+ topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
- this.serviceConfig =
topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker =
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 2661727..b2447a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -407,6 +407,15 @@ public class DispatchRateLimiter {
|| dispatchRate.dispatchThrottlingRateInByte > 0);
}
+ /**
+ * returns available byte-permit if msg-dispatch-throttling is enabled
else it returns -1.
+ *
+ * @return
+ */
+ public long getAvailableDispatchRateLimitOnByte() {
+ return dispatchRateLimiterOnByte == null ? -1 :
dispatchRateLimiterOnByte.getAvailablePermits();
+ }
+
public void close() {
// close rate-limiter
if (dispatchRateLimiterOnMessage != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 3f61156..6b2b4ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,7 +43,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -103,7 +103,6 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
private static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
- protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
enum ReadType {
@@ -111,8 +110,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
public PersistentDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor, Subscription subscription) {
- super(subscription);
- this.serviceConfig =
topic.getBrokerService().pulsar().getConfiguration();
+ super(subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery =
cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
@@ -250,6 +248,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
int currentTotalAvailablePermits = totalAvailablePermits;
if (currentTotalAvailablePermits > 0 &&
isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(currentTotalAvailablePermits,
readBatchSize);
+ long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record
to read
@@ -281,11 +280,12 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
TimeUnit.MILLISECONDS);
return;
} else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
+ Pair<Integer, Long> calculateResult =
computeReadLimits(
+ messagesToRead, (int)
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+ bytesToRead,
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+ messagesToRead = calculateResult.getLeft();
+ bytesToRead = calculateResult.getRight();
}
}
@@ -300,11 +300,11 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
TimeUnit.MILLISECONDS);
return;
} else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
+ Pair<Integer, Long> calculateResult =
computeReadLimits(
+ messagesToRead, (int)
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+ bytesToRead,
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+ messagesToRead = calculateResult.getLeft();
+ bytesToRead = calculateResult.getRight();
}
}
@@ -319,6 +319,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
+ bytesToRead = Math.max(bytesToRead, 1);
Set<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
@@ -351,8 +352,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
- cursor.asyncReadEntriesOrWait(messagesToRead,
serviceConfig.getDispatcherMaxReadSizeBytes(), this,
- ReadType.Normal);
+ cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this, ReadType.Normal);
} else {
log.debug("[{}] Cannot schedule next read until previous one
is done", name);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 2930a33..4be78e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
@@ -74,7 +74,6 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
private volatile int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
- private final ServiceConfiguration serviceConfig;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
LongPairSet messagesToRedeliver;
@@ -83,12 +82,12 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor,
SubType subscriptionType, int partitionIndex,
PersistentTopic topic, Subscription subscription) {
- super(subscriptionType, partitionIndex, topic.getName(), subscription);
+ super(subscriptionType, partitionIndex, topic.getName(), subscription,
+ topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ?
Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
- this.serviceConfig =
topic.getBrokerService().pulsar().getConfiguration();
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.redeliveryTracker =
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -383,6 +382,7 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
}
int messagesToRead = Math.min(availablePermits, readBatchSize);
+ long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
// if turn of precise dispatcher flow control, adjust the records
to read
if (consumer.isPreciseDispatcherFlowControl()) {
int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
@@ -415,11 +415,12 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return;
} else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long availablePermitsOnMsg =
topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
- if (availablePermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
availablePermitsOnMsg);
- }
+ Pair<Integer, Long> calculateResult =
computeReadLimits(
+ messagesToRead, (int)
topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+ bytesToRead,
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+ messagesToRead = calculateResult.getLeft();
+ bytesToRead = calculateResult.getRight();
}
}
@@ -443,18 +444,19 @@ public final class
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return;
} else {
- // if dispatch-rate is in msg then read only msg
according to available permit
- long subPermitsOnMsg =
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
- if (subPermitsOnMsg > 0) {
- messagesToRead = Math.min(messagesToRead, (int)
subPermitsOnMsg);
- }
+ Pair<Integer, Long> calculateResult =
computeReadLimits(
+ messagesToRead, (int)
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+ bytesToRead,
dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+ messagesToRead = calculateResult.getLeft();
+ bytesToRead = calculateResult.getRight();
}
}
}
// If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
-
+ bytesToRead = Math.max(bytesToRead, 1);
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
@@ -479,7 +481,7 @@ public final class PersistentDispatcherSingleActiveConsumer
extends AbstractDisp
} else if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, this, consumer);
} else {
- cursor.asyncReadEntriesOrWait(messagesToRead,
serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer);
+ cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this, consumer);
}
} else {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 1383bc3..ffdd8f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -390,56 +391,42 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
*/
@Test(dataProvider = "subscriptions", timeOut = 5000)
public void
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType
subscription) throws Exception {
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace +
"/throttlingAll";
+ final String subscriptionName = "my-subscriber-name";
- final int byteRate = 100;
+ final int byteRate = 250;
DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
// create producer and topic
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- boolean isMessageRateUpdate = false;
- int retry = 5;
- for (int i = 0; i < retry; i++) {
- if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() >
0) {
- isMessageRateUpdate = true;
- break;
- } else {
- if (i != retry - 1) {
- Thread.sleep(100);
- }
- }
- }
- Assert.assertTrue(isMessageRateUpdate);
+ Awaitility.await().until(() ->
topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace),
dispatchRate);
final int numProducedMessages = 20;
- final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ for (int i = 0; i < numProducedMessages; i++) {
+ producer.send(new byte[99]);
+ }
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener",
receivedMessage);
totalReceived.incrementAndGet();
- latch.countDown();
}).subscribe();
- // deactive cursors
- deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
- // Asynchronously produce messages
- for (int i = 0; i < numProducedMessages; i++) {
- producer.send(new byte[byteRate / 10]);
- }
-
- latch.await();
- Assert.assertEquals(totalReceived.get(), numProducedMessages);
+ Awaitility.await().atLeast(3, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS).until(() -> totalReceived.get() >
6 && totalReceived.get() < 10);
consumer.close();
producer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7ed88f5..f678959 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -210,8 +210,8 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
* verify rate-limiting should throttle message-dispatching based on
byte-rate
*
* <pre>
- * 1. dispatch-byte-rate = 100 bytes/sec
- * 2. send 30 msgs : each with 10 byte
+ * 1. dispatch-byte-rate = 1500 bytes/sec
+ * 2. send 30 msgs : each with 150 byte
* 3. it should take up to 2 second to receive all messages
* </pre>
*
@@ -226,7 +226,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
final String topicName = "persistent://" + namespace +
"/throttlingAll-" + System.nanoTime();
final String subName = "my-subscriber-name-" + subscription;
- final int byteRate = 100;
+ final int byteRate = 1500;
DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace,
dispatchRate);
@@ -241,7 +241,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
- log.debug("Received message [{}] in the listener",
receivedMessage);
+ log.info("Received message [{}] in the listener",
receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();