This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 11bc5981aa Revert "[IOTDB-5209] Limit the read rate of compaction 
execution" (#8788)
11bc5981aa is described below

commit 11bc5981aa3fb99ecf4887ecfe4afd32c62f1086
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Mon Jan 9 12:13:51 2023 +0800

    Revert "[IOTDB-5209] Limit the read rate of compaction execution" (#8788)
---
 docs/UserGuide/Reference/Config-Manual.md                | 16 ++++++++--------
 docs/zh/UserGuide/Reference/Config-Manual.md             | 16 ++++++++--------
 .../src/assembly/resources/conf/iotdb-engine.properties  |  4 ++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 12 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java   |  8 ++++----
 .../db/engine/compaction/CompactionTaskManager.java      | 13 +++++++------
 .../iotdb/db/engine/compaction/CompactionUtils.java      |  3 ---
 .../compaction/cross/rewrite/task/SubCompactionTask.java |  2 --
 .../inner/utils/AlignedSeriesCompactionExecutor.java     |  9 +++++----
 .../inner/utils/SingleSeriesCompactionExecutor.java      | 12 ++++++------
 .../compaction/writer/AbstractCompactionWriter.java      |  3 ++-
 .../compaction/utils/CompactionConfigRestorer.java       |  2 +-
 12 files changed, 49 insertions(+), 51 deletions(-)

diff --git a/docs/UserGuide/Reference/Config-Manual.md 
b/docs/UserGuide/Reference/Config-Manual.md
index 0694d40d31..5596cb2098 100644
--- a/docs/UserGuide/Reference/Config-Manual.md
+++ b/docs/UserGuide/Reference/Config-Manual.md
@@ -913,14 +913,14 @@ The permission definitions are in 
${IOTDB\_CONF}/conf/jmx.access.
 |Default| 60000 |
 |Effective|After restart system|
 
-* compaction\_io\_rate\_per\_sec
-
-|Name| compaction\_io\_rate\_per\_sec                    |
-|:---:|:--------------------------------------------------|
-|Description| The io rate of all compaction tasks in one second |
-|Type| Int32                                             |
-|Default| 50                                                |
-|Effective| After restart system                              |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|Name| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|Description| The write rate of all compaction tasks in MB/s |
+|Type| Int32 |
+|Default| 16 |
+|Effective|After restart system|
 
 ### Insertion
 
diff --git a/docs/zh/UserGuide/Reference/Config-Manual.md 
b/docs/zh/UserGuide/Reference/Config-Manual.md
index 5fa2653b8b..33f71497b7 100644
--- a/docs/zh/UserGuide/Reference/Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Config-Manual.md
@@ -952,14 +952,14 @@ Server,客户端的使用方式详见 [SQL 命令行终端(CLI)](https://i
 |默认值| 60000 |
 |改后生效方式|重启服务生效|
 
-* compaction\_io\_rate\_per\_sec
-
-|名字| compaction\_io\_rate\_per\_sec |
-|:---:|:-------------------------------|
-|描述| 合并每秒 IO 的次数                    |
-|类型| Int32                          |
-|默认值| 50                             |
-|改后生效方式| 重启服务生效                         |
+* compaction\_write\_throughput\_mb\_per\_sec
+
+|名字| compaction\_write\_throughput\_mb\_per\_sec |
+|:---:|:---|
+|描述| 每秒可达到的写入吞吐量合并限制。|
+|类型| Int32 |
+|默认值| 16 |
+|改后生效方式| 重启服务生效|
 
 
 #### 插入配置
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 33f4926348..ac166426b3 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -509,9 +509,9 @@ timestamp_precision=ms
 # Datatype: long, Unit: ms
 # compaction_submission_interval_in_ms=60000
 
-# The limit of io rate of compaction can reach per second
+# The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_io_rate_per_sec=50
+# compaction_write_throughput_mb_per_sec=16
 
 # The maximum session idle time. unit: ms
 # Idle sessions are the ones that performs neither query or non-query 
operations for a period of time
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c0ce1248d5..b195d3df55 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -600,8 +600,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of compaction io times can reach per second */
-  private int compactionIORatePerSec = 50;
+  /** The limit of compaction merge can reach per second */
+  private int compactionWriteThroughputMbPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set 
to 1 when less than or
@@ -1718,12 +1718,12 @@ public class IoTDBConfig {
         insertMultiTabletEnableMultithreadingColumnThreshold;
   }
 
-  public int getCompactionIORatePerSec() {
-    return compactionIORatePerSec;
+  public int getCompactionWriteThroughputMbPerSec() {
+    return compactionWriteThroughputMbPerSec;
   }
 
-  public void setCompactionIORatePerSec(int compactionIORatePerSec) {
-    this.compactionIORatePerSec = compactionIORatePerSec;
+  public void setCompactionWriteThroughputMbPerSec(int 
compactionWriteThroughputMbPerSec) {
+    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1d0a89d2b9..2616bbb15e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -591,11 +591,11 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionIORatePerSec(
+    conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
             properties.getProperty(
                 "compaction_write_throughput_mb_per_sec",
-                Integer.toString(conf.getCompactionIORatePerSec()))));
+                
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
     conf.setEnablePartialInsert(
         Boolean.parseBoolean(
@@ -1364,11 +1364,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", 
Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionIORatePerSec(
+      conf.setCompactionWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
                   "merge_write_throughput_mb_per_sec",
-                  Integer.toString(conf.getCompactionIORatePerSec()))));
+                  
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 7fdd197fc0..ddac1512ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -83,7 +83,7 @@ public class CompactionTaskManager implements IService {
   private final long TASK_SUBMIT_INTERVAL =
       
IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionIntervalInMs();
 
-  private final RateLimiter compactionIORateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter mergeWriteRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 
   public static CompactionTaskManager getInstance() {
     return INSTANCE;
@@ -274,9 +274,10 @@ public class CompactionTaskManager implements IService {
     }
   }
 
-  public RateLimiter getCompactionIORateLimiter() {
-    
setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec());
-    return compactionIORateLimiter;
+  public RateLimiter getMergeWriteRateLimiter() {
+    setWriteMergeRate(
+        
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
+    return mergeWriteRateLimiter;
   }
 
   private void setWriteMergeRate(final double throughoutMbPerSec) {
@@ -285,8 +286,8 @@ public class CompactionTaskManager implements IService {
     if (throughout == 0) {
       throughout = Double.MAX_VALUE;
     }
-    if (compactionIORateLimiter.getRate() != throughout) {
-      compactionIORateLimiter.setRate(throughout);
+    if (mergeWriteRateLimiter.getRate() != throughout) {
+      mergeWriteRateLimiter.setRate(throughout);
     }
   }
   /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 2a786d7d95..7c94873747 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -150,9 +150,6 @@ public class CompactionUtils {
       compactionWriter.startChunkGroup(device, true);
       compactionWriter.startMeasurement(measurementSchemas, 0);
       writeWithReader(compactionWriter, dataBatchReader, 0);
-      CompactionTaskManager.getInstance()
-          .getCompactionIORateLimiter()
-          .acquire(measurementSchemas.size() + 1);
       compactionWriter.endMeasurement(0);
       compactionWriter.endChunkGroup();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index f75e934b87..00e7eaa011 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -92,7 +91,6 @@ public class SubCompactionTask implements Callable<Void> {
         if (dataBatchReader.hasNextBatch()) {
           compactionWriter.startMeasurement(measurementSchemas, taskId);
           CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, 
taskId);
-          
CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1);
           compactionWriter.endMeasurement(taskId);
         }
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 990b9815c0..fd1ec218b2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -120,7 +120,6 @@ public class AlignedSeriesCompactionExecutor {
       TsFileAlignedSeriesReaderIterator readerIterator =
           new TsFileAlignedSeriesReaderIterator(reader, 
alignedChunkMetadataList, schemaList);
       while (readerIterator.hasNext()) {
-        rateLimiter.acquire(schemaList.size() + 1);
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = 
readerIterator.nextReader();
         CompactionMetricsManager.recordReadInfo(chunkReaderAndChunkSize.right);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
@@ -128,7 +127,8 @@ public class AlignedSeriesCompactionExecutor {
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      rateLimiter.acquire(schemaList.size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -167,7 +167,8 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * 
schemaList.size()) {
-      rateLimiter.acquire(schemaList.size() + 1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index fa9ee0efd8..2d49094f44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,13 +58,11 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
+      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
   private long pointCountInChunkWriter = 0;
-  private final RateLimiter ioLimiter =
-      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
 
   private final long targetChunkSize =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -284,7 +282,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws 
IOException {
-    ioLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -302,7 +300,8 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      ioLimiter.acquire(1);
+      CompactionTaskManager.mergeRateLimiterAcquire(
+          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetricsManager.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -323,7 +322,8 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    ioLimiter.acquire(1);
+    CompactionTaskManager.mergeRateLimiterAcquire(
+        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetricsManager.recordWriteInfo(
         CompactionType.INNER_SEQ_COMPACTION,
         ProcessChunkType.DESERIALIZE_CHUNK,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 683001b31c..72096069e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -142,6 +142,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
 
   protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int 
subTaskId)
       throws IOException {
+    writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
     }
@@ -172,7 +173,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
 
   protected void writeRateLimit(long bytesLength) {
     CompactionTaskManager.mergeRateLimiterAcquire(
-        CompactionTaskManager.getInstance().getCompactionIORateLimiter(), 
bytesLength);
+        CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), 
bytesLength);
   }
 
   public abstract List<TsFileIOWriter> getFileIOWriter();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index 2f6110e8a7..014ad1afd5 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -64,6 +64,6 @@ public class CompactionConfigRestorer {
     config.setConcurrentCompactionThread(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     
config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec);
+    
config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
   }
 }

Reply via email to