This is an automated email from the ASF dual-hosted git repository.

lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f1842bd66 Replication throttle rate support dynamic set (#3157)
7f1842bd66 is described below

commit 7f1842bd6693904706d96fdf0c2977a4e3f49761
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Oct 17 21:53:34 2022 +0800

    Replication throttle rate support dynamic set (#3157)
    
    Replication throttle rate support dynamic set (#3157)
---
 .../bookkeeper/client/LedgerFragmentReplicator.java       | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 0841c720a7..82daf39fc3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -111,6 +111,7 @@ public class LedgerFragmentReplicator {
 
     private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024;
     private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8;
+    private ClientConfiguration conf;
 
     public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger, 
ClientConfiguration conf) {
         this.bkc = bkc;
@@ -125,6 +126,7 @@ public class LedgerFragmentReplicator {
             this.replicationThrottle = new 
Throttler(conf.getReplicationRateByBytes());
         }
         averageEntrySize = new AtomicInteger(INITIAL_AVERAGE_ENTRY_SIZE);
+        this.conf = conf;
     }
 
     public LedgerFragmentReplicator(BookKeeper bkc, ClientConfiguration conf) {
@@ -183,6 +185,9 @@ public class LedgerFragmentReplicator {
         MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
                 entriesToReplicate.size(), ledgerFragmentMcb, null, 
BKException.Code.OK,
                 BKException.Code.LedgerRecoveryException);
+        if (this.replicationThrottle != null) {
+            
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
+        }
         for (final Long entryId : entriesToReplicate) {
             recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
                     newBookies, onReadEntryFailureCallback);
@@ -508,6 +513,16 @@ public class LedgerFragmentReplicator {
             this.rateLimiter = RateLimiter.create(throttleBytes);
         }
 
+        // reset rate of limiter before compact one entry log file
+        void resetRate(int throttleBytes) {
+            this.rateLimiter.setRate(throttleBytes);
+        }
+
+        // get rate of limiter for unit test
+        double getRate() {
+            return this.rateLimiter.getRate();
+        }
+
         // acquire. if bybytes: bytes of this entry; if byentries: 1.
         void acquire(int permits) {
             rateLimiter.acquire(permits);

Reply via email to