This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f4c0c53597 [Fix][broker] Limit replication rate based on bytes
(#22674)
7f4c0c53597 is described below
commit 7f4c0c535971d5b85c48a9cd658ae0e28dc46932
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jun 28 23:52:08 2024 +0800
[Fix][broker] Limit replication rate based on bytes (#22674)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../service/persistent/PersistentReplicator.java | 75 ++++++++++++++--------
.../broker/service/ReplicatorRateLimiterTest.java | 60 +++++++++++++++++
2 files changed, 109 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index aa53a93da5c..54b8993784e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -34,6 +34,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -199,15 +201,31 @@ public abstract class PersistentReplicator extends
AbstractReplicator
this.cursor.setInactive();
}
+ @Data
+ @AllArgsConstructor
+ private static class AvailablePermits {
+ private int messages;
+ private long bytes;
+
+ /**
+ * messages, bytes
+ * 0, O: Producer queue is full, no permits.
+ * -1, -1: Rate Limiter reaches limit.
+ * >0, >0: available permits for read entries.
+ */
+ public boolean isExceeded() {
+ return messages == -1 && bytes == -1;
+ }
+
+ public boolean isReadable() {
+ return messages > 0 && bytes > 0;
+ }
+ }
+
/**
* Calculate available permits for read entries.
- *
- * @return
- * 0: Producer queue is full, no permits.
- * -1: Rate Limiter reaches limit.
- * >0: available permits for read entries.
*/
- private int getAvailablePermits() {
+ private AvailablePermits getAvailablePermits() {
int availablePermits = producerQueueSize -
PENDING_MESSAGES_UPDATER.get(this);
// return 0, if Producer queue is full, it will pause read entries.
@@ -216,15 +234,18 @@ public abstract class PersistentReplicator extends
AbstractReplicator
log.debug("[{}] Producer queue is full, availablePermits: {},
pause reading",
replicatorId, availablePermits);
}
- return 0;
+ return new AvailablePermits(0, 0);
}
+ long availablePermitsOnMsg = -1;
+ long availablePermitsOnByte = -1;
+
// handle rate limit
if (dispatchRateLimiter.isPresent() &&
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
DispatchRateLimiter rateLimiter = dispatchRateLimiter.get();
// if dispatch-rate is in msg then read only msg according to
available permit
- long availablePermitsOnMsg =
rateLimiter.getAvailableDispatchRateLimitOnMsg();
- long availablePermitsOnByte =
rateLimiter.getAvailableDispatchRateLimitOnByte();
+ availablePermitsOnMsg =
rateLimiter.getAvailableDispatchRateLimitOnMsg();
+ availablePermitsOnByte =
rateLimiter.getAvailableDispatchRateLimitOnByte();
// no permits from rate limit
if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) {
if (log.isDebugEnabled()) {
@@ -235,14 +256,18 @@ public abstract class PersistentReplicator extends
AbstractReplicator
rateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
- return -1;
- }
- if (availablePermitsOnMsg > 0) {
- availablePermits = Math.min(availablePermits, (int)
availablePermitsOnMsg);
+ return new AvailablePermits(-1, -1);
}
}
- return availablePermits;
+ availablePermitsOnMsg =
+ availablePermitsOnMsg == -1 ? availablePermits :
Math.min(availablePermits, availablePermitsOnMsg);
+ availablePermitsOnMsg = Math.min(availablePermitsOnMsg, readBatchSize);
+
+ availablePermitsOnByte =
+ availablePermitsOnByte == -1 ? readMaxSizeBytes :
Math.min(readMaxSizeBytes, availablePermitsOnByte);
+
+ return new AvailablePermits((int) availablePermitsOnMsg,
availablePermitsOnByte);
}
protected void readMoreEntries() {
@@ -250,10 +275,10 @@ public abstract class PersistentReplicator extends
AbstractReplicator
log.info("[{}] Skip the reading due to new detected schema",
replicatorId);
return;
}
- int availablePermits = getAvailablePermits();
-
- if (availablePermits > 0) {
- int messagesToRead = Math.min(availablePermits, readBatchSize);
+ AvailablePermits availablePermits = getAvailablePermits();
+ if (availablePermits.isReadable()) {
+ int messagesToRead = availablePermits.getMessages();
+ long bytesToRead = availablePermits.getBytes();
if (!isWritable()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Throttling replication traffic because
producer is not writable", replicatorId);
@@ -262,23 +287,21 @@ public abstract class PersistentReplicator extends
AbstractReplicator
messagesToRead = 1;
}
- // If messagesToRead is 0 or less, correct it to 1 to prevent
IllegalArgumentException
- messagesToRead = Math.max(messagesToRead, 1);
-
// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Schedule read of {} messages",
replicatorId, messagesToRead);
+ log.debug("[{}] Schedule read of {} messages or {} bytes",
replicatorId, messagesToRead,
+ bytesToRead);
}
- cursor.asyncReadEntriesOrWait(messagesToRead,
readMaxSizeBytes, this,
+ cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this,
null, topic.getMaxReadPosition());
} else {
if (log.isDebugEnabled()) {
- log.debug("[{}] Not scheduling read due to pending read.
Messages To Read {}",
- replicatorId, messagesToRead);
+ log.debug("[{}] Not scheduling read due to pending read.
Messages To Read {}, Bytes To Read {}",
+ replicatorId, messagesToRead, bytesToRead);
}
}
- } else if (availablePermits == -1) {
+ } else if (availablePermits.isExceeded()) {
// no permits from rate limit
topic.getBrokerService().executor().schedule(
() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 747ef3b7f5c..90df1636061 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -548,5 +549,64 @@ public class ReplicatorRateLimiterTest extends
ReplicatorTestBase {
producer.close();
}
+ @Test
+ public void testReplicatorRateLimiterByBytes() throws Exception {
+ final String namespace = "pulsar/replicatormsg-" +
System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace +
"/RateLimiterByBytes";
+
+ admin1.namespaces().createNamespace(namespace);
+ // 0. set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+
+ final int byteRate = 400;
+ final int payloadSize = 100;
+ DispatchRate dispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(byteRate)
+ .ratePeriodInSecond(360)
+ .build();
+ admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).build();
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()));
+
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
byteRate);
+
+ @Cleanup
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString())
+ .build();
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+
+ @Cleanup
+ Consumer<byte[]> ignored =
client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2")
+ .messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ totalReceived.incrementAndGet();
+ }).subscribe();
+
+ // The total bytes is 5 times the rate limit value.
+ int numMessages = byteRate / payloadSize * 5;
+ for (int i = 0; i < numMessages * payloadSize; i++) {
+ producer.send(new byte[payloadSize]);
+ }
+
+ Awaitility.await().pollDelay(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ // The rate limit occurs in the next reading cycle, so a
value fault tolerance needs to be added.
+ assertThat(totalReceived.get()).isLessThan((byteRate /
payloadSize) + 2);
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);
}