This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd10057fd7 Load: Add LoadTsfilePointRateLimiter to control load 
tsfile throughput rate (#12356)
ddd10057fd7 is described below

commit ddd10057fd78b9acb81741df8f92c96db149b17b
Author: Itami Sho <[email protected]>
AuthorDate: Fri May 10 16:53:37 2024 +0800

    Load: Add LoadTsfilePointRateLimiter to control load tsfile throughput rate 
(#12356)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 11 +++
 .../execution/load/AlignedChunkData.java           |  3 +
 .../execution/load/LoadTsFileRateLimiter.java      | 79 ++++++++++++++++++++++
 .../execution/load/NonAlignedChunkData.java        |  3 +
 .../iotdb/db/storageengine/StorageEngine.java      |  3 +
 .../db/storageengine/dataregion/DataRegion.java    |  3 +
 .../compaction/schedule/CompactionTaskManager.java |  6 +-
 8 files changed, 115 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f3554407dcf..5e481330d09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1107,6 +1107,8 @@ public class IoTDBConfig {
 
   private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
 
+  private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; // 
Bytes/s
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -3816,6 +3818,14 @@ public class IoTDBConfig {
     this.loadCleanupTaskExecutionDelayTimeSeconds = 
loadCleanupTaskExecutionDelayTimeSeconds;
   }
 
+  public double getLoadWriteThroughputBytesPerSecond() {
+    return loadWriteThroughputBytesPerSecond;
+  }
+
+  public void setLoadWriteThroughputBytesPerSecond(double 
loadWriteThroughputBytesPerSecond) {
+    this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
+  }
+
   public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 690ad97f6f5..27bf490e466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -919,6 +919,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_clean_up_task_execution_delay_time_seconds",
                 
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
+    conf.setLoadWriteThroughputBytesPerSecond(
+        Double.parseDouble(
+            properties.getProperty(
+                "load_write_throughput_bytes_per_sec",
+                String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
 
@@ -1710,6 +1715,12 @@ public class IoTDBDescriptor {
                   "load_clean_up_task_execution_delay_time_seconds",
                   
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
 
+      conf.setLoadWriteThroughputBytesPerSecond(
+          Double.parseDouble(
+              properties.getProperty(
+                  "load_write_throughput_bytes_per_sec",
+                  
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+
       // update merge_threshold_of_explain_analyze
       conf.setMergeThresholdOfExplainAnalyze(
           Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
index 3eb431ddff1..46bd37e630b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
@@ -158,6 +158,7 @@ public class AlignedChunkData implements ChunkData {
   private void serializeAttr(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
     ReadWriteIOUtils.write(device, stream);
+    ReadWriteIOUtils.write(dataSize, stream);
     ReadWriteIOUtils.write(needDecodeChunk, stream);
     ReadWriteIOUtils.write(chunkHeaderList.size(), stream);
     for (ChunkHeader chunkHeader : chunkHeaderList) {
@@ -391,6 +392,7 @@ public class AlignedChunkData implements ChunkData {
     TTimePartitionSlot timePartitionSlot =
         
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
     String device = ReadWriteIOUtils.readString(stream);
+    long dataSize = ReadWriteIOUtils.readLong(stream);
     boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);
     List<ChunkHeader> chunkHeaderList = new ArrayList<>();
@@ -411,6 +413,7 @@ public class AlignedChunkData implements ChunkData {
     chunkData.chunkHeaderList = chunkHeaderList;
     chunkData.pageNumbers = pageNumbers;
     chunkData.deserializeTsFileData(stream);
+    chunkData.dataSize = dataSize;
     chunkData.close();
     return chunkData;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
new file mode 100644
index 00000000000..8aa7841a8d0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import com.google.common.util.concurrent.RateLimiter;
+
+public class LoadTsFileRateLimiter {
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final AtomicDouble throughputBytesPerSecond =
+      new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
+  private final RateLimiter loadWriteRateLimiter;
+
+  private LoadTsFileRateLimiter() {
+    final double throughputBytesPerSecondLimit = 
throughputBytesPerSecond.get();
+    loadWriteRateLimiter =
+        // if throughput <= 0, disable rate limiting
+        throughputBytesPerSecondLimit <= 0
+            ? RateLimiter.create(Double.MAX_VALUE)
+            : RateLimiter.create(throughputBytesPerSecondLimit);
+  }
+
+  public void acquire(long bytes) {
+    if (throughputBytesPerSecond.get() != 
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
+      final double newThroughputBytesPerSecond = 
CONFIG.getLoadWriteThroughputBytesPerSecond();
+      throughputBytesPerSecond.set(newThroughputBytesPerSecond);
+      loadWriteRateLimiter.setRate(
+          // if throughput <= 0, disable rate limiting
+          newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE : 
newThroughputBytesPerSecond);
+    }
+
+    while (bytes > 0) {
+      if (bytes > Integer.MAX_VALUE) {
+        loadWriteRateLimiter.acquire(Integer.MAX_VALUE);
+        bytes -= Integer.MAX_VALUE;
+      } else {
+        loadWriteRateLimiter.acquire((int) bytes);
+        return;
+      }
+    }
+  }
+
+  //////////////////////////// Singleton ////////////////////////////
+
+  private static class LoadTsFileRateLimiterHolder {
+
+    private static final LoadTsFileRateLimiter INSTANCE = new 
LoadTsFileRateLimiter();
+
+    private LoadTsFileRateLimiterHolder() {
+      // Prevent instantiation
+    }
+  }
+
+  public static LoadTsFileRateLimiter getInstance() {
+    return LoadTsFileRateLimiterHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
index f1165081bea..77467d55e98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
@@ -128,6 +128,7 @@ public class NonAlignedChunkData implements ChunkData {
   private void serializeAttr(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
     ReadWriteIOUtils.write(device, stream);
+    ReadWriteIOUtils.write(dataSize, stream);
     ReadWriteIOUtils.write(needDecodeChunk, stream);
     chunkHeader.serializeTo(stream); // chunk header already serialize chunk 
type
     if (needDecodeChunk) {
@@ -263,6 +264,7 @@ public class NonAlignedChunkData implements ChunkData {
     TTimePartitionSlot timePartitionSlot =
         
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
     String device = ReadWriteIOUtils.readString(stream);
+    long dataSize = ReadWriteIOUtils.readLong(stream);
     boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     byte chunkType = ReadWriteIOUtils.readByte(stream);
     ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
@@ -275,6 +277,7 @@ public class NonAlignedChunkData implements ChunkData {
     chunkData.needDecodeChunk = needDecodeChunk;
     chunkData.pageNumber = pageNumber;
     chunkData.deserializeTsFileData(stream);
+    chunkData.dataSize = dataSize;
     chunkData.close();
     return chunkData;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index c8c35e0958c..218b80392ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
@@ -851,6 +852,8 @@ public class StorageEngine implements IService {
       return status;
     }
 
+    LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());
+
     try {
       loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), 
pieceNode, uuid);
     } catch (IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3e97a3b2c72..18dcadffc07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -2808,6 +2809,8 @@ public class DataRegion implements IDataRegionForQuery {
         tsFileToLoad.getAbsolutePath(),
         targetFile.getAbsolutePath());
 
+    
LoadTsFileRateLimiter.getInstance().acquire(tsFileResource.getTsFile().length());
+
     // move file from sync dir to data dir
     if (!targetFile.getParentFile().exists()) {
       targetFile.getParentFile().mkdirs();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index fbe9970f933..b0066398318 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -275,8 +275,8 @@ public class CompactionTaskManager implements IService {
     return compactionReadOperationRateLimiter;
   }
 
-  public void setWriteMergeRate(final double throughoutMbPerSec) {
-    setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0);
+  public void setWriteMergeRate(final double throughputMbPerSec) {
+    setRate(mergeWriteRateLimiter, throughputMbPerSec * 1024.0 * 1024.0);
   }
 
   public void setCompactionReadOperationRate(final double readOperationPerSec) 
{
@@ -288,7 +288,7 @@ public class CompactionTaskManager implements IService {
   }
 
   private void setRate(RateLimiter rateLimiter, double rate) {
-    // if throughout = 0, disable rate limiting
+    // if throughput = 0, disable rate limiting
     if (rate <= 0) {
       rate = Double.MAX_VALUE;
     }

Reply via email to