This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ddd10057fd7 Load: Add LoadTsfilePointRateLimiter to control load
tsfile throughput rate (#12356)
ddd10057fd7 is described below
commit ddd10057fd78b9acb81741df8f92c96db149b17b
Author: Itami Sho <[email protected]>
AuthorDate: Fri May 10 16:53:37 2024 +0800
Load: Add LoadTsfilePointRateLimiter to control load tsfile throughput rate
(#12356)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +++
.../execution/load/AlignedChunkData.java | 3 +
.../execution/load/LoadTsFileRateLimiter.java | 79 ++++++++++++++++++++++
.../execution/load/NonAlignedChunkData.java | 3 +
.../iotdb/db/storageengine/StorageEngine.java | 3 +
.../db/storageengine/dataregion/DataRegion.java | 3 +
.../compaction/schedule/CompactionTaskManager.java | 6 +-
8 files changed, 115 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f3554407dcf..5e481330d09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1107,6 +1107,8 @@ public class IoTDBConfig {
private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
+ private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; //
Bytes/s
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -3816,6 +3818,14 @@ public class IoTDBConfig {
this.loadCleanupTaskExecutionDelayTimeSeconds =
loadCleanupTaskExecutionDelayTimeSeconds;
}
+ public double getLoadWriteThroughputBytesPerSecond() {
+ return loadWriteThroughputBytesPerSecond;
+ }
+
+ public void setLoadWriteThroughputBytesPerSecond(double
loadWriteThroughputBytesPerSecond) {
+ this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
+ }
+
public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 690ad97f6f5..27bf490e466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -919,6 +919,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_clean_up_task_execution_delay_time_seconds",
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
+ conf.setLoadWriteThroughputBytesPerSecond(
+ Double.parseDouble(
+ properties.getProperty(
+ "load_write_throughput_bytes_per_sec",
+ String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir",
conf.getExtPipeDir()).trim());
@@ -1710,6 +1715,12 @@ public class IoTDBDescriptor {
"load_clean_up_task_execution_delay_time_seconds",
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
+ conf.setLoadWriteThroughputBytesPerSecond(
+ Double.parseDouble(
+ properties.getProperty(
+ "load_write_throughput_bytes_per_sec",
+
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+
// update merge_threshold_of_explain_analyze
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
index 3eb431ddff1..46bd37e630b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
@@ -158,6 +158,7 @@ public class AlignedChunkData implements ChunkData {
private void serializeAttr(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
ReadWriteIOUtils.write(device, stream);
+ ReadWriteIOUtils.write(dataSize, stream);
ReadWriteIOUtils.write(needDecodeChunk, stream);
ReadWriteIOUtils.write(chunkHeaderList.size(), stream);
for (ChunkHeader chunkHeader : chunkHeaderList) {
@@ -391,6 +392,7 @@ public class AlignedChunkData implements ChunkData {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
+ long dataSize = ReadWriteIOUtils.readLong(stream);
boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);
List<ChunkHeader> chunkHeaderList = new ArrayList<>();
@@ -411,6 +413,7 @@ public class AlignedChunkData implements ChunkData {
chunkData.chunkHeaderList = chunkHeaderList;
chunkData.pageNumbers = pageNumbers;
chunkData.deserializeTsFileData(stream);
+ chunkData.dataSize = dataSize;
chunkData.close();
return chunkData;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
new file mode 100644
index 00000000000..8aa7841a8d0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.common.util.concurrent.RateLimiter;
+
+public class LoadTsFileRateLimiter {
+
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private final AtomicDouble throughputBytesPerSecond =
+ new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
+ private final RateLimiter loadWriteRateLimiter;
+
+ private LoadTsFileRateLimiter() {
+ final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
+ loadWriteRateLimiter =
+ // if throughput <= 0, disable rate limiting
+ throughputBytesPerSecondLimit <= 0
+ ? RateLimiter.create(Double.MAX_VALUE)
+ : RateLimiter.create(throughputBytesPerSecondLimit);
+ }
+
+ public void acquire(long bytes) {
+ if (throughputBytesPerSecond.get() !=
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
+ final double newThroughputBytesPerSecond =
CONFIG.getLoadWriteThroughputBytesPerSecond();
+ throughputBytesPerSecond.set(newThroughputBytesPerSecond);
+ loadWriteRateLimiter.setRate(
+ // if throughput <= 0, disable rate limiting
+ newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE :
newThroughputBytesPerSecond);
+ }
+
+ while (bytes > 0) {
+ if (bytes > Integer.MAX_VALUE) {
+ loadWriteRateLimiter.acquire(Integer.MAX_VALUE);
+ bytes -= Integer.MAX_VALUE;
+ } else {
+ loadWriteRateLimiter.acquire((int) bytes);
+ return;
+ }
+ }
+ }
+
+ //////////////////////////// Singleton ////////////////////////////
+
+ private static class LoadTsFileRateLimiterHolder {
+
+ private static final LoadTsFileRateLimiter INSTANCE = new
LoadTsFileRateLimiter();
+
+ private LoadTsFileRateLimiterHolder() {
+ // Prevent instantiation
+ }
+ }
+
+ public static LoadTsFileRateLimiter getInstance() {
+ return LoadTsFileRateLimiterHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
index f1165081bea..77467d55e98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
@@ -128,6 +128,7 @@ public class NonAlignedChunkData implements ChunkData {
private void serializeAttr(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
ReadWriteIOUtils.write(device, stream);
+ ReadWriteIOUtils.write(dataSize, stream);
ReadWriteIOUtils.write(needDecodeChunk, stream);
chunkHeader.serializeTo(stream); // chunk header already serialize chunk
type
if (needDecodeChunk) {
@@ -263,6 +264,7 @@ public class NonAlignedChunkData implements ChunkData {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
+ long dataSize = ReadWriteIOUtils.readLong(stream);
boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
byte chunkType = ReadWriteIOUtils.readByte(stream);
ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
@@ -275,6 +277,7 @@ public class NonAlignedChunkData implements ChunkData {
chunkData.needDecodeChunk = needDecodeChunk;
chunkData.pageNumber = pageNumber;
chunkData.deserializeTsFileData(stream);
+ chunkData.dataSize = dataSize;
chunkData.close();
return chunkData;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index c8c35e0958c..218b80392ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
@@ -851,6 +852,8 @@ public class StorageEngine implements IService {
return status;
}
+ LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());
+
try {
loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId),
pieceNode, uuid);
} catch (IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3e97a3b2c72..18dcadffc07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -2808,6 +2809,8 @@ public class DataRegion implements IDataRegionForQuery {
tsFileToLoad.getAbsolutePath(),
targetFile.getAbsolutePath());
+
LoadTsFileRateLimiter.getInstance().acquire(tsFileResource.getTsFile().length());
+
// move file from sync dir to data dir
if (!targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index fbe9970f933..b0066398318 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -275,8 +275,8 @@ public class CompactionTaskManager implements IService {
return compactionReadOperationRateLimiter;
}
- public void setWriteMergeRate(final double throughoutMbPerSec) {
- setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0);
+ public void setWriteMergeRate(final double throughputMbPerSec) {
+ setRate(mergeWriteRateLimiter, throughputMbPerSec * 1024.0 * 1024.0);
}
public void setCompactionReadOperationRate(final double readOperationPerSec)
{
@@ -288,7 +288,7 @@ public class CompactionTaskManager implements IService {
}
private void setRate(RateLimiter rateLimiter, double rate) {
- // if throughout = 0, disable rate limiting
+ // if throughput = 0, disable rate limiting
if (rate <= 0) {
rate = Double.MAX_VALUE;
}