This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch zy131
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/zy131 by this push:
new 175e8d7f198 zy131 compaction cp commits (#12565)
175e8d7f198 is described below
commit 175e8d7f198a2d9323036a06f40b4c1ae3bd02e0
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 21 18:30:21 2024 +0800
zy131 compaction cp commits (#12565)
* Fix uncompress byte buffer in RepairDataFileScanUtil
* Modify compaction task priority comparator (#12255)
* delay estimate memory of InnerSpaceCompactionTask (#12314)
* Compaction rate limit (#12312)
* add compaction read rate limiter
* modify method name
* modify default compaction read rate limit
* add config item to limit compaction read speed
* add config item to limit compaction read speed
* fix throughput rate is not correct
* fix some issues
* compaction read rate has no limit in default config
* use correct unit when init rate limiter
* modify maxSizePerWrite
* remove useless code
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 27 +++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 48 ++++++++--
.../compaction/io/CompactionTsFileOutput.java | 104 +++++++++++++++++++++
.../compaction/io/CompactionTsFileReader.java | 7 ++
.../compaction/io/CompactionTsFileWriter.java | 22 +----
.../compaction/repair/RepairDataFileScanUtil.java | 15 +--
.../compaction/schedule/CompactionScheduler.java | 6 --
.../compaction/schedule/CompactionTaskManager.java | 49 ++++++++--
.../compaction/schedule/CompactionTaskQueue.java | 52 +++++------
.../DefaultCompactionTaskComparatorImpl.java | 21 +----
.../repair/RepairDataFileScanUtilTest.java | 29 ++++++
.../resources/conf/iotdb-common.properties | 12 ++-
12 files changed, 299 insertions(+), 93 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f70e3f8d8d1..8ddd2e1cbf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -685,9 +685,18 @@ public class IoTDBConfig {
*/
private long mergeIntervalSec = 0L;
- /** The limit of compaction merge can reach per second */
+ /** The limit of compaction merge can reach per second. When <= 0, no limit.
unit: megabyte */
private int compactionWriteThroughputMbPerSec = 16;
+ /**
+ * The limit of compaction read throughput can reach per second. When <= 0,
no limit. unit:
+ * megabyte
+ */
+ private int compactionReadThroughputMbPerSec = 0;
+
+ /** The limit of compaction read operation can reach per second. When <= 0,
no limit. */
+ private int compactionReadOperationPerSec = 0;
+
/**
* How many thread will be set up to perform compaction, 10 by default. Set
to 1 when less than or
* equal to 0.
@@ -2047,6 +2056,22 @@ public class IoTDBConfig {
this.memtableSizeThreshold = memtableSizeThreshold;
}
+ public int getCompactionReadThroughputMbPerSec() {
+ return compactionReadThroughputMbPerSec;
+ }
+
+ public void setCompactionReadThroughputMbPerSec(int
compactionReadThroughputMbPerSec) {
+ this.compactionReadThroughputMbPerSec = compactionReadThroughputMbPerSec;
+ }
+
+ public int getCompactionReadOperationPerSec() {
+ return compactionReadOperationPerSec;
+ }
+
+ public void setCompactionReadOperationPerSec(int
compactionReadOperationPerSec) {
+ this.compactionReadOperationPerSec = compactionReadOperationPerSec;
+ }
+
public boolean isEnableTimedFlushSeqMemtable() {
return enableTimedFlushSeqMemtable;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 731e460333c..4d98da7ebc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -657,6 +657,18 @@ public class IoTDBDescriptor {
"compaction_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ conf.setCompactionReadThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+ conf.setCompactionReadOperationPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_operation_per_sec",
+ Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
conf.setEnableTsFileValidation(
Boolean.parseBoolean(
properties.getProperty(
@@ -1128,6 +1140,35 @@ public class IoTDBDescriptor {
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
}
+ // hot load compaction rate limit configurations
+
+ // update merge_write_throughput_mb_per_sec
+ conf.setCompactionWriteThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "merge_write_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+ // update compaction_read_operation_per_sec
+ conf.setCompactionReadOperationPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_operation_per_sec",
+ Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
+ // update compaction_read_throughput_mb_per_sec
+ conf.setCompactionReadThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+ CompactionTaskManager.getInstance()
+
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
+ CompactionTaskManager.getInstance()
+
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
+ CompactionTaskManager.getInstance()
+ .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
}
private boolean loadCompactionThreadCountHotModifiedProps(Properties
properties) {
@@ -1595,13 +1636,6 @@ public class IoTDBDescriptor {
Long.parseLong(
properties.getProperty(
"slow_query_threshold",
Long.toString(conf.getSlowQueryThreshold()))));
- // update merge_write_throughput_mb_per_sec
- conf.setCompactionWriteThroughputMbPerSec(
- Integer.parseInt(
- properties.getProperty(
- "merge_write_throughput_mb_per_sec",
-
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
-
// update select into operation max buffer size
conf.setIntoOperationBufferSizeInByte(
Long.parseLong(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
new file mode 100644
index 00000000000..3992b6ab284
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class CompactionTsFileOutput extends OutputStream implements
TsFileOutput {
+
+ private TsFileOutput output;
+ private RateLimiter rateLimiter;
+ private final int maxSizePerWrite;
+
+ public CompactionTsFileOutput(TsFileOutput output, RateLimiter rateLimiter) {
+ this.output = output;
+ this.rateLimiter = rateLimiter;
+ this.maxSizePerWrite = (int) Math.min((long) rateLimiter.getRate(),
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ rateLimiter.acquire(1);
+ output.wrapAsStream().write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte b) throws IOException {
+ rateLimiter.acquire(1);
+ output.write(b);
+ }
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b.array());
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return output.getPosition();
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.close();
+ }
+
+ @Override
+ public OutputStream wrapAsStream() {
+ return this;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ output.flush();
+ }
+
+ @Override
+ public void truncate(long size) throws IOException {
+ output.truncate(size);
+ }
+
+ @Override
+ public void force() throws IOException {
+ output.force();
+ }
+
+ @Override
+ public void write(byte[] buf, int start, int length) throws IOException {
+ while (length > 0) {
+ int writeSize = Math.min(length, maxSizePerWrite);
+ rateLimiter.acquire(writeSize);
+ output.wrapAsStream().write(buf, start, writeSize);
+ start += writeSize;
+ length -= writeSize;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 3795904ba57..442ede3f277 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -70,6 +71,7 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
@Override
protected ByteBuffer readData(long position, int totalSize) throws
IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
ByteBuffer buffer = super.readData(position, totalSize);
readDataSize.addAndGet(totalSize);
return buffer;
@@ -194,6 +196,11 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
.recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
}
+ private void acquireReadDataSizeWithCompactionReadRateLimiter(int
readDataSize) {
+
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
+ }
+
@Override
public boolean equals(Object o) {
return super.equals(o);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 834814746f8..f59332b6c53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -48,6 +48,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
throws IOException {
super(file, enableMemoryControl, maxMetadataSize);
this.type = type;
+ super.out =
+ new CompactionTsFileOutput(
+ super.out,
CompactionTaskManager.getInstance().getMergeWriteRateLimiter());
}
public void markStartingWritingAligned() {
@@ -66,7 +69,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
}
chunkWriter.writeToFileWriter(this);
long writtenDataSize = this.getPos() - beforeOffset;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(
type,
@@ -82,7 +84,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
}
super.writeChunk(chunk, chunkMetadata);
long writtenDataSize = this.getPos() - beforeOffset;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(
type,
@@ -104,13 +105,11 @@ public class CompactionTsFileWriter extends
TsFileIOWriter {
long writtenDataSize = this.getPos() - beforeOffset;
CompactionMetrics.getInstance()
.recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
}
@Override
public int checkMetadataSizeAndMayFlush() throws IOException {
int size = super.checkMetadataSizeAndMayFlush();
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(size);
CompactionMetrics.getInstance().recordWriteInfo(type,
CompactionIoDataType.METADATA, size);
return size;
}
@@ -120,25 +119,10 @@ public class CompactionTsFileWriter extends
TsFileIOWriter {
long beforeSize = this.getPos();
super.endFile();
long writtenDataSize = this.getPos() - beforeSize;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
}
- private void acquireWrittenDataSizeWithCompactionWriteRateLimiter(long
writtenDataSize) {
- while (writtenDataSize > 0) {
- if (writtenDataSize > Integer.MAX_VALUE) {
-
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(Integer.MAX_VALUE);
- writtenDataSize -= Integer.MAX_VALUE;
- } else {
- CompactionTaskManager.getInstance()
- .getMergeWriteRateLimiter()
- .acquire((int) writtenDataSize);
- return;
- }
- }
- }
-
public boolean isEmptyTargetFile() {
return isEmptyTargetFile;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index 4ce1552412d..ff2ad831fb2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
@@ -71,7 +73,12 @@ public class RepairDataFileScanUtil {
public void scanTsFile() {
File tsfile = resource.getTsFile();
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsfile.getPath())) {
+ try (TsFileSequenceReader reader =
+ new CompactionTsFileReader(
+ tsfile.getPath(),
+ resource.isSeq()
+ ? CompactionType.INNER_SEQ_COMPACTION
+ : CompactionType.INNER_UNSEQ_COMPACTION)) {
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
Pair<String, Boolean> deviceIsAlignedPair = deviceIterator.next();
@@ -195,11 +202,7 @@ public class RepairDataFileScanUtil {
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(compressionType);
byte[] uncompressedData = new byte[pageHeader.getUncompressedSize()];
unCompressor.uncompress(
- pageData.array(),
- 0,
- pageHeader.getCompressedSize(),
- uncompressedData,
- pageHeader.getUncompressedSize());
+ pageData.array(), 0, pageHeader.getCompressedSize(), uncompressedData,
0);
return ByteBuffer.wrap(uncompressedData);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index fccbe81d5c2..804f86f990e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -196,12 +196,6 @@ public class CompactionScheduler {
"Compaction task start check failed because disk free ratio is less
than disk_space_warning_threshold");
return false;
}
- // check task memory cost
- long allocatedTotalCompactionMemory =
SystemInfo.getInstance().getMemorySizeForCompaction();
- long estimatedTaskMemoryCost = task.getEstimatedMemoryCost();
- if (estimatedTaskMemoryCost < 0 || estimatedTaskMemoryCost >
allocatedTotalCompactionMemory) {
- return false;
- }
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 7fa2e1158aa..13f0c11a356 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -82,7 +82,21 @@ public class CompactionTaskManager implements IService {
storageGroupTasks = new ConcurrentHashMap<>();
private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
- private final RateLimiter mergeWriteRateLimiter =
RateLimiter.create(Double.MAX_VALUE);
+ private final RateLimiter mergeWriteRateLimiter =
+ RateLimiter.create(
+ config.getCompactionWriteThroughputMbPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionWriteThroughputMbPerSec() * 1024.0 *
1024.0);
+ private final RateLimiter compactionReadOperationRateLimiter =
+ RateLimiter.create(
+ config.getCompactionReadOperationPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionReadOperationPerSec());
+ private final RateLimiter compactionReadThroughputRateLimiter =
+ RateLimiter.create(
+ config.getCompactionReadThroughputMbPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionReadThroughputMbPerSec() * 1024.0 *
1024.0);
private volatile boolean init = false;
@@ -243,19 +257,36 @@ public class CompactionTaskManager implements IService {
}
public RateLimiter getMergeWriteRateLimiter() {
- setWriteMergeRate(
-
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
return mergeWriteRateLimiter;
}
- private void setWriteMergeRate(final double throughoutMbPerSec) {
- double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+ public RateLimiter getCompactionReadRateLimiter() {
+ return compactionReadThroughputRateLimiter;
+ }
+
+ public RateLimiter getCompactionReadOperationRateLimiter() {
+ return compactionReadOperationRateLimiter;
+ }
+
+ public void setWriteMergeRate(final double throughoutMbPerSec) {
+ setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0);
+ }
+
+ public void setCompactionReadOperationRate(final double readOperationPerSec)
{
+ setRate(compactionReadOperationRateLimiter, readOperationPerSec);
+ }
+
+ public void setCompactionReadThroughputRate(final double throughputMbPerSec)
{
+ setRate(compactionReadThroughputRateLimiter, throughputMbPerSec * 1024.0 *
1024.0);
+ }
+
+ private void setRate(RateLimiter rateLimiter, double rate) {
// if throughout = 0, disable rate limiting
- if (throughout <= 0) {
- throughout = Double.MAX_VALUE;
+ if (rate <= 0) {
+ rate = Double.MAX_VALUE;
}
- if (mergeWriteRateLimiter.getRate() != throughout) {
- mergeWriteRateLimiter.setRate(throughout);
+ if (Math.abs(rateLimiter.getRate() - rate) > 0.0001) {
+ rateLimiter.setRate(rate);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 66a6900afb1..9f0a5dbbc7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -36,48 +36,44 @@ public class CompactionTaskQueue extends
FixedPriorityBlockingQueue<AbstractComp
public AbstractCompactionTask take() throws InterruptedException {
final ReentrantLock lock = this.lock;
while (true) {
+ AbstractCompactionTask task = null;
lock.lockInterruptibly();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
- AbstractCompactionTask task = tryPollExecutableTask();
- // task == null indicates that there is no runnable task now
- if (task != null) {
- return task;
- }
+ task = queue.pollFirst();
} finally {
lock.unlock();
}
- Thread.sleep(TimeUnit.SECONDS.toMillis(1));
- }
- }
-
- private AbstractCompactionTask tryPollExecutableTask() {
- while (true) {
- if (queue.isEmpty()) {
- return null;
- }
- AbstractCompactionTask task = queue.pollFirst();
- if (task == null) {
- continue;
- }
- if (!checkTaskValid(task)) {
- dropCompactionTask(task);
- continue;
- }
- if (!task.tryOccupyResourcesForRunning()) {
- queue.add(task);
- return null;
- }
- if (!transitTaskFileStatus(task)) {
- dropCompactionTask(task);
+ boolean prepareTaskSuccess = prepareTask(task);
+ if (!prepareTaskSuccess) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
continue;
}
return task;
}
}
+ private boolean prepareTask(AbstractCompactionTask task) throws
InterruptedException {
+ if (task == null) {
+ return false;
+ }
+ if (!checkTaskValid(task)) {
+ dropCompactionTask(task);
+ return false;
+ }
+ if (!task.tryOccupyResourcesForRunning()) {
+ put(task);
+ return false;
+ }
+ if (!transitTaskFileStatus(task)) {
+ dropCompactionTask(task);
+ return false;
+ }
+ return true;
+ }
+
private void dropCompactionTask(AbstractCompactionTask task) {
task.resetCompactionCandidateStatusForAllSourceFiles();
task.handleTaskCleanup();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
index d4353a9807e..7df717af4ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -107,7 +107,9 @@ public class DefaultCompactionTaskComparatorImpl implements
ICompactionTaskCompa
// if the max file version of o1 and o2 are different
// we prefer to execute task with greater file version
// because we want to compact newly written files
- if (o1.getMaxFileVersion() != o2.getMaxFileVersion()) {
+ if (o1.getDataRegionId().equals(o2.getDataRegionId())
+ && o1.getTimePartition() == o2.getTimePartition()
+ && o1.getMaxFileVersion() != o2.getMaxFileVersion()) {
return o2.getMaxFileVersion() > o1.getMaxFileVersion() ? 1 : -1;
}
@@ -116,23 +118,10 @@ public class DefaultCompactionTaskComparatorImpl
implements ICompactionTaskCompa
// if the number of selected files are different
// we prefer to execute task with more files
- if (selectedFilesOfO1.size() != selectedFilesOfO2.size()) {
+ int fileNumDiff = Math.abs(selectedFilesOfO1.size() -
selectedFilesOfO2.size());
+ if (2 * fileNumDiff >= Math.min(selectedFilesOfO1.size(),
selectedFilesOfO2.size())) {
return selectedFilesOfO2.size() - selectedFilesOfO1.size();
}
-
- // if the serial id of the tasks are different
- // we prefer task with small serial id
- if (o1.getSerialId() != o2.getSerialId()) {
- return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
- }
-
- // if the size of selected files are different
- // we prefer to execute task with smaller file size
- // because small files can be compacted quickly
- if (o1.getSelectedFileSize() != o2.getSelectedFileSize()) {
- return (int) (o1.getSelectedFileSize() - o2.getSelectedFileSize());
- }
-
return 0;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
index 0d528ebcc3a..7b4ac9d6788 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
@@ -135,4 +135,33 @@ public class RepairDataFileScanUtilTest extends
AbstractCompactionTest {
Assert.assertFalse(scanUtil2.isBrokenFile());
Assert.assertTrue(scanUtil2.hasUnsortedData());
}
+
+ @Test
+ public void testScanFileWithDifferentCompressionTypes() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0",
+ new TimeRange[] {new TimeRange(10, 40), new TimeRange(50, 60)},
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0",
+ new TimeRange[] {new TimeRange(10, 40), new TimeRange(50, 60)},
+ TSEncoding.PLAIN,
+ CompressionType.GZIP);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[] {new TimeRange(10, 40), new TimeRange(20, 50)},
+ TSEncoding.PLAIN,
+ CompressionType.ZSTD);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile();
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedData());
+ }
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e6bf64a48b5..ac118c593e2 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -668,9 +668,19 @@ data_replication_factor=1
# The limit of write throughput merge can reach per second
# values less than or equal to 0 means no limit
-# Datatype: int
+# Datatype: int, Unit: megabyte
# compaction_write_throughput_mb_per_sec=16
+# The limit of read throughput merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int, Unit: megabyte
+# compaction_read_throughput_mb_per_sec=0
+
+# The limit of read operation merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int
+# compaction_read_operation_per_sec=0
+
# The number of sub compaction threads to be set up to perform compaction.
# Currently only works for nonAligned data in cross space compaction and unseq
inner space compaction.
# Set to 1 when less than or equal to 0.