This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 185927e500 [To rel/0.13][IOTDB-5776]Update memory estimation of cross
space compaction (#9623)
185927e500 is described below
commit 185927e500501b48ed486b34496564603a4d5e31
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Apr 19 11:36:33 2023 +0800
[To rel/0.13][IOTDB-5776]Update memory estimation of cross space compaction
(#9623)
---
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +-
.../selector/AbstractCompactionEstimator.java | 6 ++
.../selector/RewriteCompactionFileSelector.java | 12 ++-
.../selector/RewriteCrossCompactionEstimator.java | 108 +++++++++++++--------
.../org/apache/iotdb/db/rescon/SystemInfo.java | 3 +
.../cross/RewriteCompactionFileSelectorTest.java | 4 +-
8 files changed, 111 insertions(+), 48 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index ed6a504db1..9a626ceb53 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -453,6 +453,11 @@ timestamp_precision=ms
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE
+# Whether to enable compaction memory control. If true and estimated memory
size of
+# one compaction task exceeds the threshold, system will block it. It only
works for cross space compaction currently.
+# Datatype: boolean
+# enable_compaction_mem_control=true
+
# size proportion for chunk metadata maintains in memory when compacting
# Datatype: double
# chunk_metadata_memory_size_proportion=0.1
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f97a1d2d22..449ee57d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -404,6 +404,13 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
+ /**
+ * Enable compaction memory control or not. If true and estimated memory
size of one compaction
+ * task exceeds the threshold, system will block the compaction. It only
works for cross space
+ * compaction currently.
+ */
+ private boolean enableCompactionMemControl = true;
+
private double chunkMetadataMemorySizeProportion = 0.1;
/** The target tsfile size in compaction, 1 GB by default */
@@ -2823,6 +2830,14 @@ public class IoTDBConfig {
this.customizedProperties = customizedProperties;
}
+ public boolean isEnableCompactionMemControl() {
+ return enableCompactionMemControl;
+ }
+
+ public void setEnableCompactionMemControl(boolean
enableCompactionMemControl) {
+ this.enableCompactionMemControl = enableCompactionMemControl;
+ }
+
public double getChunkMetadataMemorySizeProportion() {
return chunkMetadataMemorySizeProportion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2616bbb15e..1ddb246600 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -344,7 +344,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"max_waiting_time_when_insert_blocked",
Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
-
+ conf.setEnableCompactionMemControl(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_compaction_mem_control",
+ Boolean.toString(conf.isEnableCompactionMemControl()))));
conf.setChunkMetadataMemorySizeProportion(
Double.parseDouble(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
index f891b73ab5..51f410f3c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -34,6 +36,10 @@ public abstract class AbstractCompactionEstimator implements
AutoCloseable {
protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new
HashMap<>();
+ protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ protected int compressionRatio = 5;
+
/**
* Estimate the memory cost of compacting the unseq file and its
corresponding overlapped seq
* files in cross space compaction task.
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index 1f12fd5f24..3c95135dfc 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -56,6 +56,9 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
CrossSpaceCompactionResource resource;
long totalCost;
+
+ private long totalUnseqFileSize;
+ private long totalSeqFileSize;
private long memoryBudget;
private long maxSeqFileCost;
private int maxCrossCompactionFileNum;
@@ -145,11 +148,14 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
}
if (logger.isInfoEnabled()) {
logger.info(
- "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory
cost {}, "
+ "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory
cost {} MB, total file size is {} MB, total seq file size is {} MB, total unseq
file size is {} MB, "
+ "time consumption {}ms",
selectedSeqFiles.size(),
selectedUnseqFiles.size(),
- totalCost,
+ (float) totalCost / 1024 / 1024,
+ (float) (totalUnseqFileSize + totalSeqFileSize) / 1024 / 1024,
+ (float) totalSeqFileSize / 1024 / 1024,
+ (float) totalUnseqFileSize / 1024 / 1024,
System.currentTimeMillis() - startTime);
}
return new List[] {selectedSeqFiles, selectedUnseqFiles};
@@ -230,6 +236,7 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
for (int i = 0; i < seqSelected.length; i++) {
if (seqSelected[i]) {
selectedSeqFiles.add(resource.getSeqFiles().get(i));
+ totalSeqFileSize += resource.getSeqFiles().get(i).getTsFileSize();
}
}
}
@@ -260,6 +267,7 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
&& totalSize <= maxCrossCompactionFileSize
&& totalCost + newCost < memoryBudget)) {
selectedUnseqFiles.add(unseqFile);
+ totalUnseqFileSize += unseqFile.getTsFileSize();
maxSeqFileCost = tempMaxSeqFileCost;
for (Integer seqIdx : tmpSelectedSeqFiles) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
index 1908b1f684..49998d53a5 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
@@ -23,13 +23,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,13 +39,10 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
// task
private long maxCostOfReadingSeqFile;
- // left is the max chunk num in chunkgroup of unseq file, right is the total
chunk num of unseq
- // file.
- private Pair<Integer, Integer> maxUnseqChunkNumInDevice;
+ // the max cost of writing target file
+ private long maxCostOfWritingTargetFile;
- // it stores all chunk info of seq files. Left is the max chunk num in
chunkgroup of seq file,
- // right is the total chunk num of seq file.
- private final List<Pair<Integer, Integer>> maxSeqChunkNumInDeviceList;
+ private int maxConcurrentSeriesNum = 1;
// the number of timeseries being compacted at the same time
private final int subCompactionTaskNum =
@@ -55,7 +50,7 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
public RewriteCrossCompactionEstimator() {
this.maxCostOfReadingSeqFile = 0;
- this.maxSeqChunkNumInDeviceList = new ArrayList<>();
+ this.maxCostOfWritingTargetFile = 0;
}
@Override
@@ -65,7 +60,6 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
cost += calculateReadingUnseqFile(unseqResource);
cost += calculateReadingSeqFiles(seqResources);
cost += calculatingWritingTargetFiles(seqResources, unseqResource);
- maxSeqChunkNumInDeviceList.clear();
return cost;
}
@@ -75,25 +69,28 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
*/
private long calculateReadingUnseqFile(TsFileResource unseqResource) throws
IOException {
TsFileSequenceReader reader = getFileReader(unseqResource);
- int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+ FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains aligned
series,
// else is sub compaction task num.
- int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum :
fileInfo[2];
- maxUnseqChunkNumInDevice = new Pair<>(fileInfo[3], fileInfo[0]);
- // it means the max size of a timeseries in this file when reading all of
its chunk into memory.
- // Not only reading chunk into chunk cache, but also need to deserialize
data point into merge
- // reader, so we have to double the cost here.
- if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile
has no chunk.
+ int concurrentSeriesNum =
+ fileInfo.maxAlignedSeriesNumInDevice == -1
+ ? subCompactionTaskNum
+ : fileInfo.maxAlignedSeriesNumInDevice;
+ maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum,
concurrentSeriesNum);
+ if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this
unSeq tsFile has no chunk.
logger.warn(
"calculateReadingUnseqFile(), find 1 empty unSeq tsFile: {}.",
unseqResource.getTsFilePath());
return 0;
}
- return 2 * concurrentSeriesNum * (unseqResource.getTsFileSize() *
fileInfo[1] / fileInfo[0]);
+ // it means the max size of a timeseries in this file when reading all of
its chunk into memory.
+ return compressionRatio
+ * concurrentSeriesNum
+ * (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum /
fileInfo.totalChunkNum);
}
/**
- * Calculate memory cost of reading source seq files in the cross space
compaction. Double the
+ * Calculate memory cost of reading source seq files in the cross space
compaction. Select the
* maximun size of the timeseries to be compacted at the same time in one
seq file, because only
* one seq file will be queried at the same time.
*/
@@ -101,38 +98,41 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
long cost = 0;
for (TsFileResource seqResource : seqResources) {
TsFileSequenceReader reader = getFileReader(seqResource);
- int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+ FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
// it is max aligned series num of one device when tsfile contains
aligned series,
// else is sub compaction task num.
- int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum :
fileInfo[2];
- long seqFileCost = 0;
- if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this seq tsFile
has no chunk.
+ int concurrentSeriesNum =
+ fileInfo.maxAlignedSeriesNumInDevice == -1
+ ? subCompactionTaskNum
+ : fileInfo.maxAlignedSeriesNumInDevice;
+ maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum,
concurrentSeriesNum);
+ long seqFileCost;
+ if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this
seq tsFile has no chunk.
logger.warn(
"calculateReadingSeqFiles(), find 1 empty seq tsFile: {}.",
seqResource.getTsFilePath());
seqFileCost = 0;
} else {
- seqFileCost =
- concurrentSeriesNum * (seqResource.getTsFileSize() * fileInfo[1] /
fileInfo[0]);
+ // We need to multiply the compression ratio here.
+ seqFileCost = compressionRatio * concurrentSeriesNum *
config.getTargetChunkSize();
}
if (seqFileCost > maxCostOfReadingSeqFile) {
// Only one seq file will be read at the same time.
// not only reading chunk into chunk cache, but also need to
deserialize data point into
- // merge reader, so we have to double the cost here.
- cost -= 2 * maxCostOfReadingSeqFile;
- cost += 2 * seqFileCost;
+ // merge reader. We have to add the cost in merge reader here and the
cost of chunk cache is
+ // unnecessary.
+ cost -= maxCostOfReadingSeqFile;
+ cost += seqFileCost;
maxCostOfReadingSeqFile = seqFileCost;
}
- maxSeqChunkNumInDeviceList.add(new Pair<>(fileInfo[3], fileInfo[0]));
}
return cost;
}
/**
* Calculate memory cost of writing target files in the cross space
compaction. Including metadata
- * size of all seq files, max chunk group size of each seq file and max
chunk group size of
- * corresponding overlapped unseq file.
+ * size of all source files and size of concurrent target chunks.
*/
private long calculatingWritingTargetFiles(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws
IOException {
@@ -141,17 +141,16 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
TsFileSequenceReader reader = getFileReader(seqResource);
// add seq file metadata size
cost += reader.getFileMetadataSize();
- // add max chunk group size of this seq tsfile
- int totalSeqChunkNum = maxSeqChunkNumInDeviceList.get(0).right;
- if (totalSeqChunkNum > 0) {
- cost +=
- seqResource.getTsFileSize() *
maxSeqChunkNumInDeviceList.get(0).left / totalSeqChunkNum;
- }
}
- // add max chunk group size of overlapped unseq tsfile
- int totalUnSeqChunkNum = maxUnseqChunkNumInDevice.right;
- if (totalUnSeqChunkNum > 0) {
- cost += unseqResource.getTsFileSize() * maxUnseqChunkNumInDevice.left /
totalUnSeqChunkNum;
+ // add unseq file metadata size
+ cost += getFileReader(unseqResource).getFileMetadataSize();
+
+ // concurrent series chunk size
+ long writingTargetCost = maxConcurrentSeriesNum *
config.getTargetChunkSize();
+ if (writingTargetCost > maxCostOfWritingTargetFile) {
+ cost -= maxCostOfWritingTargetFile;
+ cost += writingTargetCost;
+ maxCostOfWritingTargetFile = writingTargetCost;
}
return cost;
@@ -169,7 +168,7 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
*
* <p>max chunk num of one device in this tsfile
*/
- private int[] getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws
IOException {
+ private FileInfo getSeriesAndDeviceChunkNum(TsFileSequenceReader reader)
throws IOException {
int totalChunkNum = 0;
int maxChunkNum = 0;
int maxAlignedSeriesNumInDevice = -1;
@@ -190,6 +189,29 @@ public class RewriteCrossCompactionEstimator extends
AbstractCrossSpaceEstimator
}
maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum);
}
- return new int[] {totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice,
maxDeviceChunkNum};
+ return new FileInfo(totalChunkNum, maxChunkNum,
maxAlignedSeriesNumInDevice, maxDeviceChunkNum);
+ }
+
+ private class FileInfo {
+ // total chunk num in this tsfile
+ public int totalChunkNum = 0;
+ // max chunk num of one timeseries in this tsfile
+ public int maxSeriesChunkNum = 0;
+ // max aligned series num in one device. If there is no aligned series in
this file, then it
+ // turns to be -1.
+ public int maxAlignedSeriesNumInDevice = -1;
+ // max chunk num of one device in this tsfile
+ public int maxDeviceChunkNum = 0;
+
+ public FileInfo(
+ int totalChunkNum,
+ int maxSeriesChunkNum,
+ int maxAlignedSeriesNumInDevice,
+ int maxDeviceChunkNum) {
+ this.totalChunkNum = totalChunkNum;
+ this.maxSeriesChunkNum = maxSeriesChunkNum;
+ this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice;
+ this.maxDeviceChunkNum = maxDeviceChunkNum;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 71050ec1b7..e99a2c36ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -278,6 +278,9 @@ public class SystemInfo {
}
public void addCompactionMemoryCost(long memoryCost) throws
InterruptedException {
+ if (!config.isEnableCompactionMemControl()) {
+ return;
+ }
long originSize = this.compactionMemoryCost.get();
while (originSize + memoryCost > memorySizeForCompaction
|| !compactionMemoryCost.compareAndSet(originSize, originSize +
memoryCost)) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 2d20e3220e..0b50e9392b 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -101,8 +101,8 @@ public class RewriteCompactionFileSelectorTest extends
MergeTest {
List[] result = mergeFileSelector.select();
List<TsFileResource> seqSelected = result[0];
List<TsFileResource> unseqSelected = result[1];
- assertEquals(seqResources.subList(0, 3), seqSelected);
- assertEquals(unseqResources.subList(0, 3), unseqSelected);
+ assertEquals(seqResources.subList(0, 1), seqSelected);
+ assertEquals(unseqResources.subList(0, 1), unseqSelected);
resource.clear();
}