This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 99502365d2b IoTConsensus Transit Snapshot Rate Limiter (#12348)
99502365d2b is described below
commit 99502365d2b46c68f0b5a8ff4c229f34adcc5123
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Apr 17 17:10:26 2024 +0800
IoTConsensus Transit Snapshot Rate Limiter (#12348)
---
.../iotdb/consensus/config/IoTConsensusConfig.java | 19 ++++++-
.../apache/iotdb/consensus/iot/IoTConsensus.java | 8 +++
.../consensus/iot/IoTConsensusServerImpl.java | 4 ++
.../iot/snapshot/IoTConsensusRateLimiter.java | 59 ++++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +++
.../db/consensus/DataRegionConsensusImpl.java | 2 +
.../storageengine/rescon/quotas/QuotaLimiter.java | 3 ++
.../resources/conf/iotdb-common.properties | 4 ++
.../quotas/AverageIntervalRateLimiter.java | 2 +-
.../commons}/quotas/FixedIntervalRateLimiter.java | 2 +-
.../apache/iotdb/commons}/quotas/RateLimiter.java | 6 +--
12 files changed, 119 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index f4bb131b3bf..6c89e19291b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -248,6 +248,7 @@ public class IoTConsensusConfig {
private final long checkpointGap;
private final long allocateMemoryForConsensus;
private final long allocateMemoryForQueue;
+ private final long regionMigrationSpeedLimitBytesPerSecond;
private Replication(
int maxLogEntriesNumPerBatch,
@@ -262,7 +263,8 @@ public class IoTConsensusConfig {
long throttleTimeOutMs,
long checkpointGap,
long allocateMemoryForConsensus,
- double maxMemoryRatioForQueue) {
+ double maxMemoryRatioForQueue,
+ long regionMigrationSpeedLimitBytesPerSecond) {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatchesNum = maxPendingBatchesNum;
@@ -276,6 +278,7 @@ public class IoTConsensusConfig {
this.checkpointGap = checkpointGap;
this.allocateMemoryForConsensus = allocateMemoryForConsensus;
this.allocateMemoryForQueue = (long) (allocateMemoryForConsensus *
maxMemoryRatioForQueue);
+ this.regionMigrationSpeedLimitBytesPerSecond =
regionMigrationSpeedLimitBytesPerSecond;
}
public int getMaxLogEntriesNumPerBatch() {
@@ -330,6 +333,10 @@ public class IoTConsensusConfig {
return allocateMemoryForQueue;
}
+ public long getRegionMigrationSpeedLimitBytesPerSecond() {
+ return regionMigrationSpeedLimitBytesPerSecond;
+ }
+
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
@@ -350,6 +357,7 @@ public class IoTConsensusConfig {
private long checkpointGap = 500;
private long allocateMemoryForConsensus =
Runtime.getRuntime().maxMemory() / 10;
private double maxMemoryRatioForQueue = 0.6;
+ private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
public Replication.Builder setMaxLogEntriesNumPerBatch(int
maxLogEntriesNumPerBatch) {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -418,6 +426,12 @@ public class IoTConsensusConfig {
return this;
}
+ public Builder setRegionMigrationSpeedLimitBytesPerSecond(
+ long regionMigrationSpeedLimitBytesPerSecond) {
+ this.regionMigrationSpeedLimitBytesPerSecond =
regionMigrationSpeedLimitBytesPerSecond;
+ return this;
+ }
+
public Replication build() {
return new Replication(
maxLogEntriesNumPerBatch,
@@ -432,7 +446,8 @@ public class IoTConsensusConfig {
throttleTimeOutMs,
checkpointGap,
allocateMemoryForConsensus,
- maxMemoryRatioForQueue);
+ maxMemoryRatioForQueue,
+ regionMigrationSpeedLimitBytesPerSecond);
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index a4d249c3d49..01d86f851db 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
+import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -120,6 +121,13 @@ public class IoTConsensus implements IConsensus {
.init(
config.getIotConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
config.getIotConsensusConfig().getReplication().getAllocateMemoryForQueue());
+ // init IoTConsensus Rate Limiter
+ IoTConsensusRateLimiter.getInstance()
+ .init(
+ config
+ .getIotConsensusConfig()
+ .getReplication()
+ .getRegionMigrationSpeedLimitBytesPerSecond());
}
@Override
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index b553218584a..ecd8debea7b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
+import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
@@ -120,6 +121,8 @@ public class IoTConsensusServerImpl {
private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
private final String consensusGroupId;
private final ScheduledExecutorService backgroundTaskService;
+ private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
+ IoTConsensusRateLimiter.getInstance();
public IoTConsensusServerImpl(
String storageDir,
@@ -316,6 +319,7 @@ public class IoTConsensusServerImpl {
// TODO: zero copy ?
TSendSnapshotFragmentReq req =
reader.next().toTSendSnapshotFragmentReq();
req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
+
ioTConsensusRateLimiter.acquireTransitDataSizeWithRateLimiter(req.getChunkLength());
TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
if (!isSuccess(res.getStatus())) {
throw new ConsensusGroupModifyPeerException(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
new file mode 100644
index 00000000000..2622a7d16b3
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.snapshot;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IoTConsensusRateLimiter {
+ private static final Logger logger =
LoggerFactory.getLogger(IoTConsensusRateLimiter.class);
+
+ private final RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+
+ private IoTConsensusRateLimiter() {}
+
+ public void init(long regionMigrationSpeedLimitBytesPerSecond) {
+ rateLimiter.setRate(regionMigrationSpeedLimitBytesPerSecond);
+ }
+
+ /**
+ * Acquire the size of the data to be sent.
+ *
+ * @param transitDataSize the size of the data to be sent
+ */
+ public void acquireTransitDataSizeWithRateLimiter(long transitDataSize) {
+ while (transitDataSize > 0) {
+ if (transitDataSize > Integer.MAX_VALUE) {
+ rateLimiter.acquire(Integer.MAX_VALUE);
+ transitDataSize -= Integer.MAX_VALUE;
+ } else {
+ rateLimiter.acquire((int) transitDataSize);
+ return;
+ }
+ }
+ }
+
+ private static final IoTConsensusRateLimiter INSTANCE = new
IoTConsensusRateLimiter();
+
+ public static IoTConsensusRateLimiter getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 27c07ff6bd0..e7493770f2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1078,6 +1078,7 @@ public class IoTDBConfig {
private int maxSizePerBatch = 16 * 1024 * 1024;
private int maxPendingBatchesNum = 5;
private double maxMemoryRatioForQueue = 0.6;
+ private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
@@ -1127,6 +1128,15 @@ public class IoTDBConfig {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
}
+ public long getRegionMigrationSpeedLimitBytesPerSecond() {
+ return regionMigrationSpeedLimitBytesPerSecond;
+ }
+
+ public void setRegionMigrationSpeedLimitBytesPerSecond(
+ long regionMigrationSpeedLimitBytesPerSecond) {
+ this.regionMigrationSpeedLimitBytesPerSecond =
regionMigrationSpeedLimitBytesPerSecond;
+ }
+
public void setMaxSizePerBatch(int maxSizePerBatch) {
this.maxSizePerBatch = maxSizePerBatch;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 14438eb845e..df797ff1afe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1067,6 +1067,13 @@ public class IoTDBDescriptor {
"data_region_iot_max_memory_ratio_for_queue",
String.valueOf(conf.getMaxMemoryRatioForQueue()))
.trim()));
+ conf.setRegionMigrationSpeedLimitBytesPerSecond(
+ Long.parseLong(
+ properties
+ .getProperty(
+ "region_migration_speed_limit_bytes_per_second",
+
String.valueOf(conf.getRegionMigrationSpeedLimitBytesPerSecond()))
+ .trim()));
}
private void loadAuthorCache(Properties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index f1f978fd477..42f80ccd61a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -112,6 +112,8 @@ public class DataRegionConsensusImpl {
.setMaxSizePerBatch(CONF.getMaxSizePerBatch())
.setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
.setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
+
.setRegionMigrationSpeedLimitBytesPerSecond(
+
CONF.getRegionMigrationSpeedLimitBytesPerSecond())
.build())
.build())
.setRatisConfig(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
index 138df719fdc..9a659d67dd9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.storageengine.rescon.quotas;
import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
import org.apache.iotdb.common.rpc.thrift.ThrottleType;
import org.apache.iotdb.commons.exception.RpcThrottlingException;
+import org.apache.iotdb.commons.quotas.AverageIntervalRateLimiter;
+import org.apache.iotdb.commons.quotas.FixedIntervalRateLimiter;
+import org.apache.iotdb.commons.quotas.RateLimiter;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index fc2da76d388..6b98d0906c9 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -771,6 +771,10 @@ data_replication_factor=1
# Datatype: double
# data_region_iot_max_memory_ratio_for_queue = 0.6
+# The maximum transit size in byte per second for region migration
+# Datatype: long
+# region_migration_speed_limit_bytes_per_second = 33554432
+
####################
### TsFile Configurations
####################
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
index c88b78eb434..87c75ba9315 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
/**
* This limiter will refill resources at every TimeUnit/resources interval.
For example: For a
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
index d17c5a4e2f1..739c9889bd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
/**
* With this limiter resources will be refilled only after a fixed interval of
time. Copy from
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
index be35c8e48db..dee54e70c0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
public abstract class RateLimiter {
@@ -71,7 +71,7 @@ public abstract class RateLimiter {
* @param limit Maximum available resource units that can be refilled to.
* @return how many resource units may be refilled ?
*/
- abstract long refill(long limit);
+ protected abstract long refill(long limit);
/**
* Time in milliseconds to wait for before requesting to consume 'amount'
resource.
@@ -81,7 +81,7 @@ public abstract class RateLimiter {
* @param amount Resources for which time interval to calculate for
* @return estimate of the ms required to wait before being able to provide
'amount' resources.
*/
- abstract long getWaitInterval(long limit, long available, long amount);
+ protected abstract long getWaitInterval(long limit, long available, long
amount);
public synchronized long getTimeUnitInMillis() {
return tunit;