This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f4c0c53597 [Fix][broker] Limit replication rate based on bytes 
(#22674)
7f4c0c53597 is described below

commit 7f4c0c535971d5b85c48a9cd658ae0e28dc46932
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jun 28 23:52:08 2024 +0800

    [Fix][broker] Limit replication rate based on bytes (#22674)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../service/persistent/PersistentReplicator.java   | 75 ++++++++++++++--------
 .../broker/service/ReplicatorRateLimiterTest.java  | 60 +++++++++++++++++
 2 files changed, 109 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index aa53a93da5c..54b8993784e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -34,6 +34,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -199,15 +201,31 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         this.cursor.setInactive();
     }
 
+    @Data
+    @AllArgsConstructor
+    private static class AvailablePermits {
+        private int messages;
+        private long bytes;
+
+        /**
+         * messages, bytes
+         * 0, O:  Producer queue is full, no permits.
+         * -1, -1:  Rate Limiter reaches limit.
+         * >0, >0:  available permits for read entries.
+         */
+        public boolean isExceeded() {
+            return messages == -1 && bytes == -1;
+        }
+
+        public boolean isReadable() {
+            return messages > 0 && bytes > 0;
+        }
+    }
+
     /**
      * Calculate available permits for read entries.
-     *
-     * @return
-     *   0:  Producer queue is full, no permits.
-     *  -1:  Rate Limiter reaches limit.
-     *  >0:  available permits for read entries.
      */
-    private int getAvailablePermits() {
+    private AvailablePermits getAvailablePermits() {
         int availablePermits = producerQueueSize - 
PENDING_MESSAGES_UPDATER.get(this);
 
         // return 0, if Producer queue is full, it will pause read entries.
@@ -216,15 +234,18 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                 log.debug("[{}] Producer queue is full, availablePermits: {}, 
pause reading",
                         replicatorId, availablePermits);
             }
-            return 0;
+            return new AvailablePermits(0, 0);
         }
 
+        long availablePermitsOnMsg = -1;
+        long availablePermitsOnByte = -1;
+
         // handle rate limit
         if (dispatchRateLimiter.isPresent() && 
dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
             DispatchRateLimiter rateLimiter = dispatchRateLimiter.get();
             // if dispatch-rate is in msg then read only msg according to 
available permit
-            long availablePermitsOnMsg = 
rateLimiter.getAvailableDispatchRateLimitOnMsg();
-            long availablePermitsOnByte = 
rateLimiter.getAvailableDispatchRateLimitOnByte();
+            availablePermitsOnMsg = 
rateLimiter.getAvailableDispatchRateLimitOnMsg();
+            availablePermitsOnByte = 
rateLimiter.getAvailableDispatchRateLimitOnByte();
             // no permits from rate limit
             if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) {
                 if (log.isDebugEnabled()) {
@@ -235,14 +256,18 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                             rateLimiter.getDispatchRateOnByte(),
                             MESSAGE_RATE_BACKOFF_MS);
                 }
-                return -1;
-            }
-            if (availablePermitsOnMsg > 0) {
-                availablePermits = Math.min(availablePermits, (int) 
availablePermitsOnMsg);
+                return new AvailablePermits(-1, -1);
             }
         }
 
-        return availablePermits;
+        availablePermitsOnMsg =
+                availablePermitsOnMsg == -1 ? availablePermits : 
Math.min(availablePermits, availablePermitsOnMsg);
+        availablePermitsOnMsg = Math.min(availablePermitsOnMsg, readBatchSize);
+
+        availablePermitsOnByte =
+                availablePermitsOnByte == -1 ? readMaxSizeBytes : 
Math.min(readMaxSizeBytes, availablePermitsOnByte);
+
+        return new AvailablePermits((int) availablePermitsOnMsg, 
availablePermitsOnByte);
     }
 
     protected void readMoreEntries() {
@@ -250,10 +275,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             log.info("[{}] Skip the reading due to new detected schema", 
replicatorId);
             return;
         }
-        int availablePermits = getAvailablePermits();
-
-        if (availablePermits > 0) {
-            int messagesToRead = Math.min(availablePermits, readBatchSize);
+        AvailablePermits availablePermits = getAvailablePermits();
+        if (availablePermits.isReadable()) {
+            int messagesToRead = availablePermits.getMessages();
+            long bytesToRead = availablePermits.getBytes();
             if (!isWritable()) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Throttling replication traffic because 
producer is not writable", replicatorId);
@@ -262,23 +287,21 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                 messagesToRead = 1;
             }
 
-            // If messagesToRead is 0 or less, correct it to 1 to prevent 
IllegalArgumentException
-            messagesToRead = Math.max(messagesToRead, 1);
-
             // Schedule read
             if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schedule read of {} messages", 
replicatorId, messagesToRead);
+                    log.debug("[{}] Schedule read of {} messages or {} bytes", 
replicatorId, messagesToRead,
+                            bytesToRead);
                 }
-                cursor.asyncReadEntriesOrWait(messagesToRead, 
readMaxSizeBytes, this,
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
                         null, topic.getMaxReadPosition());
             } else {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Not scheduling read due to pending read. 
Messages To Read {}",
-                            replicatorId, messagesToRead);
+                    log.debug("[{}] Not scheduling read due to pending read. 
Messages To Read {}, Bytes To Read {}",
+                            replicatorId, messagesToRead, bytesToRead);
                 }
             }
-        } else if (availablePermits == -1) {
+        } else if (availablePermits.isExceeded()) {
             // no permits from rate limit
             topic.getBrokerService().executor().schedule(
                 () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 747ef3b7f5c..90df1636061 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -548,5 +549,64 @@ public class ReplicatorRateLimiterTest extends 
ReplicatorTestBase {
         producer.close();
     }
 
+    @Test
+    public void testReplicatorRateLimiterByBytes() throws Exception {
+        final String namespace = "pulsar/replicatormsg-" + 
System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + 
"/RateLimiterByBytes";
+
+        admin1.namespaces().createNamespace(namespace);
+        // 0. set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+
+        final int byteRate = 400;
+        final int payloadSize = 100;
+        DispatchRate dispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(byteRate)
+                .ratePeriodInSecond(360)
+                .build();
+        admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()));
+        
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
 byteRate);
+
+        @Cleanup
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString())
+                .build();
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+
+        @Cleanup
+        Consumer<byte[]> ignored = 
client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2")
+                .messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", 
receivedMessage);
+                    totalReceived.incrementAndGet();
+                }).subscribe();
+
+        // The total bytes is 5 times the rate limit value.
+        int numMessages = byteRate / payloadSize * 5;
+        for (int i = 0; i < numMessages * payloadSize; i++) {
+            producer.send(new byte[payloadSize]);
+        }
+
+        Awaitility.await().pollDelay(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    // The rate limit occurs in the next reading cycle, so a 
value fault tolerance needs to be added.
+                    assertThat(totalReceived.get()).isLessThan((byteRate / 
payloadSize) + 2);
+                });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);
 }

Reply via email to