This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch zy131
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/zy131 by this push:
     new 175e8d7f198 zy131 compaction cp commits (#12565)
175e8d7f198 is described below

commit 175e8d7f198a2d9323036a06f40b4c1ae3bd02e0
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 21 18:30:21 2024 +0800

    zy131 compaction cp commits (#12565)
    
    * Fix uncompress byte buffer in RepairDataFileScanUtil
    
    * Modify compaction task priority comparator (#12255)
    
    * delay estimate memory of InnerSpaceCompactionTask (#12314)
    
    * Compaction rate limit (#12312)
    
    * add compaction read rate limiter
    
    * modify method name
    
    * modify default compaction read rate limit
    
    * add config item to limit compaction read speed
    
    * add config item to limit compaction read speed
    
    * fix throughput rate is not correct
    
    * fix some issues
    
    * compaction read rate has no limit in default config
    
    * use correct unit when init rate limiter
    
    * modify maxSizePerWrite
    
    * remove useless code
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  27 +++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  48 ++++++++--
 .../compaction/io/CompactionTsFileOutput.java      | 104 +++++++++++++++++++++
 .../compaction/io/CompactionTsFileReader.java      |   7 ++
 .../compaction/io/CompactionTsFileWriter.java      |  22 +----
 .../compaction/repair/RepairDataFileScanUtil.java  |  15 +--
 .../compaction/schedule/CompactionScheduler.java   |   6 --
 .../compaction/schedule/CompactionTaskManager.java |  49 ++++++++--
 .../compaction/schedule/CompactionTaskQueue.java   |  52 +++++------
 .../DefaultCompactionTaskComparatorImpl.java       |  21 +----
 .../repair/RepairDataFileScanUtilTest.java         |  29 ++++++
 .../resources/conf/iotdb-common.properties         |  12 ++-
 12 files changed, 299 insertions(+), 93 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f70e3f8d8d1..8ddd2e1cbf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -685,9 +685,18 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of compaction merge can reach per second */
+  /** The limit of compaction merge can reach per second. When <= 0, no limit. 
unit: megabyte */
   private int compactionWriteThroughputMbPerSec = 16;
 
+  /**
+   * The limit of compaction read throughput can reach per second. When <= 0, 
no limit. unit:
+   * megabyte
+   */
+  private int compactionReadThroughputMbPerSec = 0;
+
+  /** The limit of compaction read operation can reach per second. When <= 0, 
no limit. */
+  private int compactionReadOperationPerSec = 0;
+
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set 
to 1 when less than or
    * equal to 0.
@@ -2047,6 +2056,22 @@ public class IoTDBConfig {
     this.memtableSizeThreshold = memtableSizeThreshold;
   }
 
+  public int getCompactionReadThroughputMbPerSec() {
+    return compactionReadThroughputMbPerSec;
+  }
+
+  public void setCompactionReadThroughputMbPerSec(int 
compactionReadThroughputMbPerSec) {
+    this.compactionReadThroughputMbPerSec = compactionReadThroughputMbPerSec;
+  }
+
+  public int getCompactionReadOperationPerSec() {
+    return compactionReadOperationPerSec;
+  }
+
+  public void setCompactionReadOperationPerSec(int 
compactionReadOperationPerSec) {
+    this.compactionReadOperationPerSec = compactionReadOperationPerSec;
+  }
+
   public boolean isEnableTimedFlushSeqMemtable() {
     return enableTimedFlushSeqMemtable;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 731e460333c..4d98da7ebc1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -657,6 +657,18 @@ public class IoTDBDescriptor {
                 "compaction_write_throughput_mb_per_sec",
                 
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
+    conf.setCompactionReadThroughputMbPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "compaction_read_throughput_mb_per_sec",
+                
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+    conf.setCompactionReadOperationPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "compaction_read_operation_per_sec",
+                Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
     conf.setEnableTsFileValidation(
         Boolean.parseBoolean(
             properties.getProperty(
@@ -1128,6 +1140,35 @@ public class IoTDBDescriptor {
     if (restartCompactionTaskManager) {
       CompactionTaskManager.getInstance().restart();
     }
+    // hot load compaction rate limit configurations
+
+    // update merge_write_throughput_mb_per_sec
+    conf.setCompactionWriteThroughputMbPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "merge_write_throughput_mb_per_sec",
+                
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+    // update compaction_read_operation_per_sec
+    conf.setCompactionReadOperationPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "compaction_read_operation_per_sec",
+                Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
+    // update compaction_read_throughput_mb_per_sec
+    conf.setCompactionReadThroughputMbPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "compaction_read_throughput_mb_per_sec",
+                
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+    CompactionTaskManager.getInstance()
+        
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
+    CompactionTaskManager.getInstance()
+        
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
+    CompactionTaskManager.getInstance()
+        .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
   }
 
   private boolean loadCompactionThreadCountHotModifiedProps(Properties 
properties) {
@@ -1595,13 +1636,6 @@ public class IoTDBDescriptor {
           Long.parseLong(
               properties.getProperty(
                   "slow_query_threshold", 
Long.toString(conf.getSlowQueryThreshold()))));
-      // update merge_write_throughput_mb_per_sec
-      conf.setCompactionWriteThroughputMbPerSec(
-          Integer.parseInt(
-              properties.getProperty(
-                  "merge_write_throughput_mb_per_sec",
-                  
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
-
       // update select into operation max buffer size
       conf.setIntoOperationBufferSizeInByte(
           Long.parseLong(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
new file mode 100644
index 00000000000..3992b6ab284
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class CompactionTsFileOutput extends OutputStream implements 
TsFileOutput {
+
+  private TsFileOutput output;
+  private RateLimiter rateLimiter;
+  private final int maxSizePerWrite;
+
+  public CompactionTsFileOutput(TsFileOutput output, RateLimiter rateLimiter) {
+    this.output = output;
+    this.rateLimiter = rateLimiter;
+    this.maxSizePerWrite = (int) Math.min((long) rateLimiter.getRate(), 
Integer.MAX_VALUE);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    rateLimiter.acquire(1);
+    output.wrapAsStream().write(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte b) throws IOException {
+    rateLimiter.acquire(1);
+    output.write(b);
+  }
+
+  @Override
+  public void write(ByteBuffer b) throws IOException {
+    write(b.array());
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    return output.getPosition();
+  }
+
+  @Override
+  public void close() throws IOException {
+    output.close();
+  }
+
+  @Override
+  public OutputStream wrapAsStream() {
+    return this;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    output.flush();
+  }
+
+  @Override
+  public void truncate(long size) throws IOException {
+    output.truncate(size);
+  }
+
+  @Override
+  public void force() throws IOException {
+    output.force();
+  }
+
+  @Override
+  public void write(byte[] buf, int start, int length) throws IOException {
+    while (length > 0) {
+      int writeSize = Math.min(length, maxSizePerWrite);
+      rateLimiter.acquire(writeSize);
+      output.wrapAsStream().write(buf, start, writeSize);
+      start += writeSize;
+      length -= writeSize;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 3795904ba57..442ede3f277 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
 
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -70,6 +71,7 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
 
   @Override
   protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+    acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
     ByteBuffer buffer = super.readData(position, totalSize);
     readDataSize.addAndGet(totalSize);
     return buffer;
@@ -194,6 +196,11 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
         .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
   }
 
+  private void acquireReadDataSizeWithCompactionReadRateLimiter(int 
readDataSize) {
+    
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+    
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
+  }
+
   @Override
   public boolean equals(Object o) {
     return super.equals(o);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 834814746f8..f59332b6c53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -48,6 +48,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
       throws IOException {
     super(file, enableMemoryControl, maxMetadataSize);
     this.type = type;
+    super.out =
+        new CompactionTsFileOutput(
+            super.out, 
CompactionTaskManager.getInstance().getMergeWriteRateLimiter());
   }
 
   public void markStartingWritingAligned() {
@@ -66,7 +69,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     }
     chunkWriter.writeToFileWriter(this);
     long writtenDataSize = this.getPos() - beforeOffset;
-    acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
     CompactionMetrics.getInstance()
         .recordWriteInfo(
             type,
@@ -82,7 +84,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     }
     super.writeChunk(chunk, chunkMetadata);
     long writtenDataSize = this.getPos() - beforeOffset;
-    acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
     CompactionMetrics.getInstance()
         .recordWriteInfo(
             type,
@@ -104,13 +105,11 @@ public class CompactionTsFileWriter extends 
TsFileIOWriter {
     long writtenDataSize = this.getPos() - beforeOffset;
     CompactionMetrics.getInstance()
         .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
-    acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
   }
 
   @Override
   public int checkMetadataSizeAndMayFlush() throws IOException {
     int size = super.checkMetadataSizeAndMayFlush();
-    acquireWrittenDataSizeWithCompactionWriteRateLimiter(size);
     CompactionMetrics.getInstance().recordWriteInfo(type, 
CompactionIoDataType.METADATA, size);
     return size;
   }
@@ -120,25 +119,10 @@ public class CompactionTsFileWriter extends 
TsFileIOWriter {
     long beforeSize = this.getPos();
     super.endFile();
     long writtenDataSize = this.getPos() - beforeSize;
-    acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
     CompactionMetrics.getInstance()
         .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
   }
 
-  private void acquireWrittenDataSizeWithCompactionWriteRateLimiter(long 
writtenDataSize) {
-    while (writtenDataSize > 0) {
-      if (writtenDataSize > Integer.MAX_VALUE) {
-        
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(Integer.MAX_VALUE);
-        writtenDataSize -= Integer.MAX_VALUE;
-      } else {
-        CompactionTaskManager.getInstance()
-            .getMergeWriteRateLimiter()
-            .acquire((int) writtenDataSize);
-        return;
-      }
-    }
-  }
-
   public boolean isEmptyTargetFile() {
     return isEmptyTargetFile;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index 4ce1552412d..ff2ad831fb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
@@ -71,7 +73,12 @@ public class RepairDataFileScanUtil {
 
   public void scanTsFile() {
     File tsfile = resource.getTsFile();
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsfile.getPath())) {
+    try (TsFileSequenceReader reader =
+        new CompactionTsFileReader(
+            tsfile.getPath(),
+            resource.isSeq()
+                ? CompactionType.INNER_SEQ_COMPACTION
+                : CompactionType.INNER_UNSEQ_COMPACTION)) {
       TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
       while (deviceIterator.hasNext()) {
         Pair<String, Boolean> deviceIsAlignedPair = deviceIterator.next();
@@ -195,11 +202,7 @@ public class RepairDataFileScanUtil {
     IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(compressionType);
     byte[] uncompressedData = new byte[pageHeader.getUncompressedSize()];
     unCompressor.uncompress(
-        pageData.array(),
-        0,
-        pageHeader.getCompressedSize(),
-        uncompressedData,
-        pageHeader.getUncompressedSize());
+        pageData.array(), 0, pageHeader.getCompressedSize(), uncompressedData, 
0);
     return ByteBuffer.wrap(uncompressedData);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index fccbe81d5c2..804f86f990e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -196,12 +196,6 @@ public class CompactionScheduler {
           "Compaction task start check failed because disk free ratio is less 
than disk_space_warning_threshold");
       return false;
     }
-    // check task memory cost
-    long allocatedTotalCompactionMemory = 
SystemInfo.getInstance().getMemorySizeForCompaction();
-    long estimatedTaskMemoryCost = task.getEstimatedMemoryCost();
-    if (estimatedTaskMemoryCost < 0 || estimatedTaskMemoryCost > 
allocatedTotalCompactionMemory) {
-      return false;
-    }
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 7fa2e1158aa..13f0c11a356 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -82,7 +82,21 @@ public class CompactionTaskManager implements IService {
       storageGroupTasks = new ConcurrentHashMap<>();
   private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
 
-  private final RateLimiter mergeWriteRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter mergeWriteRateLimiter =
+      RateLimiter.create(
+          config.getCompactionWriteThroughputMbPerSec() <= 0
+              ? Double.MAX_VALUE
+              : config.getCompactionWriteThroughputMbPerSec() * 1024.0 * 
1024.0);
+  private final RateLimiter compactionReadOperationRateLimiter =
+      RateLimiter.create(
+          config.getCompactionReadOperationPerSec() <= 0
+              ? Double.MAX_VALUE
+              : config.getCompactionReadOperationPerSec());
+  private final RateLimiter compactionReadThroughputRateLimiter =
+      RateLimiter.create(
+          config.getCompactionReadThroughputMbPerSec() <= 0
+              ? Double.MAX_VALUE
+              : config.getCompactionReadThroughputMbPerSec() * 1024.0 * 
1024.0);
 
   private volatile boolean init = false;
 
@@ -243,19 +257,36 @@ public class CompactionTaskManager implements IService {
   }
 
   public RateLimiter getMergeWriteRateLimiter() {
-    setWriteMergeRate(
-        
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
     return mergeWriteRateLimiter;
   }
 
-  private void setWriteMergeRate(final double throughoutMbPerSec) {
-    double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+  public RateLimiter getCompactionReadRateLimiter() {
+    return compactionReadThroughputRateLimiter;
+  }
+
+  public RateLimiter getCompactionReadOperationRateLimiter() {
+    return compactionReadOperationRateLimiter;
+  }
+
+  public void setWriteMergeRate(final double throughoutMbPerSec) {
+    setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0);
+  }
+
+  public void setCompactionReadOperationRate(final double readOperationPerSec) 
{
+    setRate(compactionReadOperationRateLimiter, readOperationPerSec);
+  }
+
+  public void setCompactionReadThroughputRate(final double throughputMbPerSec) 
{
+    setRate(compactionReadThroughputRateLimiter, throughputMbPerSec * 1024.0 * 
1024.0);
+  }
+
+  private void setRate(RateLimiter rateLimiter, double rate) {
     // if throughout = 0, disable rate limiting
-    if (throughout <= 0) {
-      throughout = Double.MAX_VALUE;
+    if (rate <= 0) {
+      rate = Double.MAX_VALUE;
     }
-    if (mergeWriteRateLimiter.getRate() != throughout) {
-      mergeWriteRateLimiter.setRate(throughout);
+    if (Math.abs(rateLimiter.getRate() - rate) > 0.0001) {
+      rateLimiter.setRate(rate);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 66a6900afb1..9f0a5dbbc7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -36,48 +36,44 @@ public class CompactionTaskQueue extends 
FixedPriorityBlockingQueue<AbstractComp
   public AbstractCompactionTask take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     while (true) {
+      AbstractCompactionTask task = null;
       lock.lockInterruptibly();
       try {
         while (queue.isEmpty()) {
           notEmpty.await();
         }
-        AbstractCompactionTask task = tryPollExecutableTask();
-        // task == null indicates that there is no runnable task now
-        if (task != null) {
-          return task;
-        }
+        task = queue.pollFirst();
       } finally {
         lock.unlock();
       }
-      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
-    }
-  }
-
-  private AbstractCompactionTask tryPollExecutableTask() {
-    while (true) {
-      if (queue.isEmpty()) {
-        return null;
-      }
-      AbstractCompactionTask task = queue.pollFirst();
-      if (task == null) {
-        continue;
-      }
-      if (!checkTaskValid(task)) {
-        dropCompactionTask(task);
-        continue;
-      }
-      if (!task.tryOccupyResourcesForRunning()) {
-        queue.add(task);
-        return null;
-      }
-      if (!transitTaskFileStatus(task)) {
-        dropCompactionTask(task);
+      boolean prepareTaskSuccess = prepareTask(task);
+      if (!prepareTaskSuccess) {
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
         continue;
       }
       return task;
     }
   }
 
+  private boolean prepareTask(AbstractCompactionTask task) throws 
InterruptedException {
+    if (task == null) {
+      return false;
+    }
+    if (!checkTaskValid(task)) {
+      dropCompactionTask(task);
+      return false;
+    }
+    if (!task.tryOccupyResourcesForRunning()) {
+      put(task);
+      return false;
+    }
+    if (!transitTaskFileStatus(task)) {
+      dropCompactionTask(task);
+      return false;
+    }
+    return true;
+  }
+
   private void dropCompactionTask(AbstractCompactionTask task) {
     task.resetCompactionCandidateStatusForAllSourceFiles();
     task.handleTaskCleanup();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
index d4353a9807e..7df717af4ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -107,7 +107,9 @@ public class DefaultCompactionTaskComparatorImpl implements 
ICompactionTaskCompa
     // if the max file version of o1 and o2 are different
     // we prefer to execute task with greater file version
     // because we want to compact newly written files
-    if (o1.getMaxFileVersion() != o2.getMaxFileVersion()) {
+    if (o1.getDataRegionId().equals(o2.getDataRegionId())
+        && o1.getTimePartition() == o2.getTimePartition()
+        && o1.getMaxFileVersion() != o2.getMaxFileVersion()) {
       return o2.getMaxFileVersion() > o1.getMaxFileVersion() ? 1 : -1;
     }
 
@@ -116,23 +118,10 @@ public class DefaultCompactionTaskComparatorImpl 
implements ICompactionTaskCompa
 
     // if the number of selected files are different
     // we prefer to execute task with more files
-    if (selectedFilesOfO1.size() != selectedFilesOfO2.size()) {
+    int fileNumDiff = Math.abs(selectedFilesOfO1.size() - 
selectedFilesOfO2.size());
+    if (2 * fileNumDiff >= Math.min(selectedFilesOfO1.size(), 
selectedFilesOfO2.size())) {
       return selectedFilesOfO2.size() - selectedFilesOfO1.size();
     }
-
-    // if the serial id of the tasks are different
-    // we prefer task with small serial id
-    if (o1.getSerialId() != o2.getSerialId()) {
-      return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
-    }
-
-    // if the size of selected files are different
-    // we prefer to execute task with smaller file size
-    // because small files can be compacted quickly
-    if (o1.getSelectedFileSize() != o2.getSelectedFileSize()) {
-      return (int) (o1.getSelectedFileSize() - o2.getSelectedFileSize());
-    }
-
     return 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
index 0d528ebcc3a..7b4ac9d6788 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
@@ -135,4 +135,33 @@ public class RepairDataFileScanUtilTest extends 
AbstractCompactionTest {
     Assert.assertFalse(scanUtil2.isBrokenFile());
     Assert.assertTrue(scanUtil2.hasUnsortedData());
   }
+
+  @Test
+  public void testScanFileWithDifferentCompressionTypes() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s0",
+          new TimeRange[] {new TimeRange(10, 40), new TimeRange(50, 60)},
+          TSEncoding.PLAIN,
+          CompressionType.SNAPPY);
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s0",
+          new TimeRange[] {new TimeRange(10, 40), new TimeRange(50, 60)},
+          TSEncoding.PLAIN,
+          CompressionType.GZIP);
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s1",
+          new TimeRange[] {new TimeRange(10, 40), new TimeRange(20, 50)},
+          TSEncoding.PLAIN,
+          CompressionType.ZSTD);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile();
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedData());
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e6bf64a48b5..ac118c593e2 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -668,9 +668,19 @@ data_replication_factor=1
 
 # The limit of write throughput merge can reach per second
 # values less than or equal to 0 means no limit
-# Datatype: int
+# Datatype: int, Unit: megabyte
 # compaction_write_throughput_mb_per_sec=16
 
+# The limit of read throughput merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int, Unit: megabyte
+# compaction_read_throughput_mb_per_sec=0
+
+# The limit of read operation merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int
+# compaction_read_operation_per_sec=0
+
 # The number of sub compaction threads to be set up to perform compaction.
 # Currently only works for nonAligned data in cross space compaction and unseq 
inner space compaction.
 # Set to 1 when less than or equal to 0.

Reply via email to