This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7f1842bd66 Replication throttle rate support dynamic set (#3157)
7f1842bd66 is described below
commit 7f1842bd6693904706d96fdf0c2977a4e3f49761
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Oct 17 21:53:34 2022 +0800
Replication throttle rate support dynamic set (#3157)
Replication throttle rate support dynamic set (#3157)
---
.../bookkeeper/client/LedgerFragmentReplicator.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 0841c720a7..82daf39fc3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -111,6 +111,7 @@ public class LedgerFragmentReplicator {
private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024;
private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8;
+ private ClientConfiguration conf;
public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger,
ClientConfiguration conf) {
this.bkc = bkc;
@@ -125,6 +126,7 @@ public class LedgerFragmentReplicator {
this.replicationThrottle = new
Throttler(conf.getReplicationRateByBytes());
}
averageEntrySize = new AtomicInteger(INITIAL_AVERAGE_ENTRY_SIZE);
+ this.conf = conf;
}
public LedgerFragmentReplicator(BookKeeper bkc, ClientConfiguration conf) {
@@ -183,6 +185,9 @@ public class LedgerFragmentReplicator {
MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
entriesToReplicate.size(), ledgerFragmentMcb, null,
BKException.Code.OK,
BKException.Code.LedgerRecoveryException);
+ if (this.replicationThrottle != null) {
+
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
+ }
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);
@@ -508,6 +513,16 @@ public class LedgerFragmentReplicator {
this.rateLimiter = RateLimiter.create(throttleBytes);
}
+ // reset rate of limiter before compact one entry log file
+ void resetRate(int throttleBytes) {
+ this.rateLimiter.setRate(throttleBytes);
+ }
+
+ // get rate of limiter for unit test
+ double getRate() {
+ return this.rateLimiter.getRate();
+ }
+
// acquire. if bybytes: bytes of this entry; if byentries: 1.
void acquire(int permits) {
rateLimiter.acquire(permits);