This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0d65058b35 [IOTDB-4492] Add necessary log and control total size for
cross space compaction task (#7399)
0d65058b35 is described below
commit 0d65058b359fc0dccb0ff030e6cfc9be25c7a9d2
Author: Mrquan <[email protected]>
AuthorDate: Thu Sep 22 15:23:21 2022 +0800
[IOTDB-4492] Add necessary log and control total size for cross space
compaction task (#7399)
---
.../resources/conf/iotdb-datanode.properties | 5 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 ++
.../compaction/cross/CrossSpaceCompactionTask.java | 17 ++++---
.../RewriteCrossSpaceCompactionSelector.java | 57 +++++++++++++++++++++-
5 files changed, 88 insertions(+), 7 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 6a275d7bf3..f4cf85d05c 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -624,6 +624,11 @@ timestamp_precision=ms
# Datatype: int
# max_cross_compaction_candidate_file_num=1000
+# The max total size when selecting cross space compaction candidate files
+# At least one unseq file with it's overlapped seq files will be selected even
exceeded this number
+# Datatype: long, Unit: byte
+# max_cross_compaction_candidate_file_size=5368709120
+
# If one merge file selection runs for more than this time, it will be ended
and its current
# selection will be used as final selection.
# When < 0, it means time is unbounded.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c277e6570e..5513ff51c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -476,6 +476,9 @@ public class IoTDBConfig {
/** The max candidate file num in cross space compaction */
private int maxCrossCompactionCandidateFileNum = 1000;
+ /** The max total size of candidate files in cross space compaction */
+ private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
+
/** The interval of compaction task schedulation in each virtual storage
group. The unit is ms. */
private long compactionScheduleIntervalInMs = 60_000L;
@@ -2794,6 +2797,14 @@ public class IoTDBConfig {
this.maxCrossCompactionCandidateFileNum =
maxCrossCompactionCandidateFileNum;
}
+ public long getMaxCrossCompactionCandidateFileSize() {
+ return maxCrossCompactionCandidateFileSize;
+ }
+
+ public void setMaxCrossCompactionCandidateFileSize(long
maxCrossCompactionCandidateFileSize) {
+ this.maxCrossCompactionCandidateFileSize =
maxCrossCompactionCandidateFileSize;
+ }
+
public long getCompactionSubmissionIntervalInMs() {
return compactionSubmissionIntervalInMs;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 39d590c3ec..2d0ac129d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -671,6 +671,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"max_cross_compaction_candidate_file_num",
Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
+ conf.setMaxCrossCompactionCandidateFileSize(
+ Long.parseLong(
+ properties.getProperty(
+ "max_cross_compaction_candidate_file_size",
+
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
conf.setCompactionWriteThroughputMbPerSec(
Integer.parseInt(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 8982220e13..650404a6fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -57,7 +57,8 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
protected List<TsFileResource> targetTsfileResourceList;
protected List<TsFileResource> holdReadLockList = new ArrayList<>();
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
- protected long selectedFileSize = 0;
+ protected double selectedSeqFileSize = 0;
+ protected double selectedUnseqFileSize = 0;
protected long memoryCost = 0L;
public CrossSpaceCompactionTask(
@@ -112,19 +113,23 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
for (TsFileResource resource : selectedSequenceFiles) {
- selectedFileSize += resource.getTsFileSize();
+ selectedSeqFileSize += resource.getTsFileSize();
}
+
for (TsFileResource resource : selectedUnsequenceFiles) {
- selectedFileSize += resource.getTsFileSize();
+ selectedUnseqFileSize += resource.getTsFileSize();
}
LOGGER.info(
- "{}-{} [Compaction] CrossSpaceCompactionTask start. Sequence files :
{}, unsequence files : {}, total size is {} MB",
+ "{}-{} [Compaction] CrossSpaceCompactionTask start. Sequence files :
{}, unsequence files : {}, "
+ + "sequence files size is {} MB, unsequence file size is {} MB,
total size is {}",
storageGroupName,
dataRegionId,
selectedSequenceFiles,
selectedUnsequenceFiles,
- ((double) selectedFileSize) / 1024.0 / 1024.0);
+ selectedSeqFileSize / 1024 / 1024,
+ selectedUnseqFileSize / 1024 / 1024,
+ (selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024);
logFile =
new File(
selectedSequenceFiles.get(0).getTsFile().getParent()
@@ -175,7 +180,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
storageGroupName,
dataRegionId,
costTime,
- ((double) selectedFileSize) / 1024.0d / 1024.0d / costTime);
+ (selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024 /
costTime);
}
} catch (Throwable throwable) {
// catch throwable to handle OOM errors
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index 35d4eb7653..b314757ef8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.engine.compaction.cross.ICrossSpaceSelector;
import
org.apache.iotdb.db.engine.compaction.cross.utils.AbstractCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.task.ICompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.rescon.SystemInfo;
@@ -46,6 +47,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private final int SELECT_WARN_THRESHOLD = 10;
protected String logicalStorageGroupName;
protected String dataRegionId;
protected long timePartition;
@@ -54,8 +56,10 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
private CrossSpaceCompactionResource resource;
private long totalCost;
+ private long totalSize;
private final long memoryBudget;
private final int maxCrossCompactionFileNum;
+ private final long maxCrossCompactionFileSize;
private List<TsFileResource> selectedUnseqFiles;
private List<TsFileResource> selectedSeqFiles;
@@ -81,6 +85,9 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
/
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
this.maxCrossCompactionFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
+ this.maxCrossCompactionFileSize =
+
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
+
this.compactionEstimator =
ICompactionSelector.getCompactionEstimator(
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
@@ -156,6 +163,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
selectedUnseqFiles = new ArrayList<>();
totalCost = 0;
+ totalSize = 0;
int unseqIndex = 0;
long startTime = System.currentTimeMillis();
@@ -190,8 +198,12 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
List<TsFileResource> tmpSelectedSeqFileResources = new ArrayList<>();
for (int seqIndex : tmpSelectedSeqFiles) {
- tmpSelectedSeqFileResources.add(resource.getSeqFiles().get(seqIndex));
+ TsFileResource tsFileResource = resource.getSeqFiles().get(seqIndex);
+ tmpSelectedSeqFileResources.add(tsFileResource);
+ totalSize += tsFileResource.getTsFileSize();
}
+ totalSize += unseqFile.getTsFileSize();
+
long newCost =
compactionEstimator.estimateCrossCompactionMemory(tmpSelectedSeqFileResources,
unseqFile);
if (!updateSelectedFiles(newCost, unseqFile)) {
@@ -214,6 +226,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
if (selectedUnseqFiles.size() == 0
|| (seqSelectedNum + selectedUnseqFiles.size() + 1 +
tmpSelectedSeqFiles.size()
<= maxCrossCompactionFileNum
+ && totalSize <= maxCrossCompactionFileSize
&& totalCost + newCost < memoryBudget)) {
selectedUnseqFiles.add(unseqFile);
@@ -275,12 +288,32 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
continue;
}
+ int crossSpaceCompactionTimes = 0;
+ try {
+ TsFileNameGenerator.TsFileName tsFileName =
+ TsFileNameGenerator.getTsFileName(seqFile.getTsFile().getName());
+ crossSpaceCompactionTimes = tsFileName.getCrossCompactionCnt();
+ } catch (IOException e) {
+ LOGGER.warn("Meets IOException when selecting files for cross space
compaction", e);
+ }
+
long seqEndTime = seqFile.getEndTime(deviceId);
long seqStartTime = seqFile.getStartTime(deviceId);
if (!seqFile.isClosed()) {
// for unclosed file, only select those that overlap with the unseq
file
if (unseqEndTime >= seqStartTime) {
tmpSelectedSeqFiles.add(i);
+ if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+ LOGGER.warn(
+ "{} is selected for cross space compaction, it is overlapped
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time
range:[{},{}]",
+ seqFile.getTsFile().getAbsolutePath(),
+ unseqFile.getTsFile().getAbsolutePath(),
+ deviceId,
+ seqStartTime,
+ seqEndTime,
+ unseqStartTime,
+ unseqEndTime);
+ }
}
} else if (unseqEndTime <= seqEndTime) {
// if time range in unseq file is 10-20, seq file is 30-40, or
@@ -288,10 +321,32 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
// there is no more overlap later.
tmpSelectedSeqFiles.add(i);
noMoreOverlap = true;
+ if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+ LOGGER.warn(
+ "{} is selected for cross space compaction, it is overlapped
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time
range:[{},{}]",
+ seqFile.getTsFile().getAbsolutePath(),
+ unseqFile.getTsFile().getAbsolutePath(),
+ deviceId,
+ seqStartTime,
+ seqEndTime,
+ unseqStartTime,
+ unseqEndTime);
+ }
} else if (unseqStartTime <= seqEndTime) {
// if time range in unseq file is 10-20, seq file is 0-15, then
select this seq file and
// there may be overlap later.
tmpSelectedSeqFiles.add(i);
+ if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+ LOGGER.warn(
+ "{} is selected for cross space compaction, it is overlapped
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time
range:[{},{}]",
+ seqFile.getTsFile().getAbsolutePath(),
+ unseqFile.getTsFile().getAbsolutePath(),
+ deviceId,
+ seqStartTime,
+ seqEndTime,
+ unseqStartTime,
+ unseqEndTime);
+ }
}
}
}