This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1b67ec52720 [to dev/1.3] Cherry pick 'estimating inner compaction task
memory during selection (#15257)' & Fix estimating memory cost for string type
in compaction (#16048)
1b67ec52720 is described below
commit 1b67ec527209e2417da437713fe4a4c578b0ee18
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 29 14:22:10 2025 +0800
[to dev/1.3] Cherry pick 'estimating inner compaction task memory during
selection (#15257)' & Fix estimating memory cost for string type in compaction
(#16048)
* estimating inner compaction task memory during selection (#15257)
* estimating inner compaction task during selection
* use tsfile id
* rename
* spotless
* modify CompactionEstimateUtils
* fix bug
* fix compile
* fix ut
* fix bug
* fix ut
* fix review
* modify CompactionEstimateUtils
* modify ut
* fix ut
* add log
* add log
* fix estimating memory cost for string type
* modify SystemInfo
* remove Collections.synchronizedMap
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 --
.../performer/ICrossCompactionPerformer.java | 6 ++
...rformer.java => IInnerCompactionPerformer.java} | 11 +-
.../execute/performer/ISeqCompactionPerformer.java | 2 +-
.../performer/IUnseqCompactionPerformer.java | 2 +-
.../performer/impl/FastCompactionPerformer.java | 21 ++++
.../impl/ReadChunkCompactionPerformer.java | 8 ++
.../impl/ReadPointCompactionPerformer.java | 8 ++
.../execute/task/AbstractCompactionTask.java | 10 ++
.../execute/task/InnerSpaceCompactionTask.java | 18 +---
.../schedule/CompactionScheduleContext.java | 24 +++++
.../compaction/schedule/CompactionScheduler.java | 5 +-
.../estimator/AbstractCompactionEstimator.java | 120 ++++++++++++++++++---
.../estimator/AbstractCrossSpaceEstimator.java | 6 +-
.../estimator/AbstractInnerSpaceEstimator.java | 5 +-
.../estimator/CompactionEstimateUtils.java | 96 +++++++++++++++--
.../selector/estimator/CompactionTaskInfo.java | 6 --
...taInfo.java => CompactionTaskMetadataInfo.java} | 12 ++-
.../FastCompactionInnerCompactionEstimator.java | 44 +++++---
.../FastCrossSpaceCompactionEstimator.java | 29 +++--
.../compaction/selector/estimator/FileInfo.java | 39 ++++++-
.../ReadChunkInnerCompactionEstimator.java | 25 +++--
.../RepairUnsortedFileCompactionEstimator.java | 21 +++-
.../impl/NewSizeTieredCompactionSelector.java | 76 ++++++++++++-
.../impl/RewriteCrossSpaceCompactionSelector.java | 2 +-
.../db/storageengine/rescon/memory/SystemInfo.java | 6 ++
.../utils/CompactionTaskMemCostEstimatorTest.java | 87 +++++++++------
27 files changed, 559 insertions(+), 141 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 890af218a59..007f45755ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -587,9 +587,6 @@ public class IoTDBConfig {
*/
private volatile double innerCompactionTaskSelectionDiskRedundancy = 0.05;
- /** The size of global compaction estimation file info cahce. */
- private int globalCompactionFileInfoCacheSize = 1000;
-
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
@@ -3949,14 +3946,6 @@ public class IoTDBConfig {
this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize;
}
- public int getGlobalCompactionFileInfoCacheSize() {
- return globalCompactionFileInfoCacheSize;
- }
-
- public void setGlobalCompactionFileInfoCacheSize(int
globalCompactionFileInfoCacheSize) {
- this.globalCompactionFileInfoCacheSize = globalCompactionFileInfoCacheSize;
- }
-
public boolean isEnableAuditLog() {
return enableAuditLog;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
index 9aad2d0b5e4..d9b9f9fbf09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
@@ -19,11 +19,17 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.List;
+import java.util.Optional;
public interface ICrossCompactionPerformer extends ICompactionPerformer {
@Override
void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource>
unseqFiles);
+
+ default Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() {
+ return Optional.empty();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java
similarity index 74%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java
index 9aad2d0b5e4..04118a1475b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java
@@ -19,11 +19,12 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
-import java.util.List;
+import java.util.Optional;
-public interface ICrossCompactionPerformer extends ICompactionPerformer {
- @Override
- void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource>
unseqFiles);
+public interface IInnerCompactionPerformer extends ICompactionPerformer {
+ default Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
+ return Optional.empty();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java
index 30d7264fdcd..3416d587d05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.List;
-public interface ISeqCompactionPerformer extends ICompactionPerformer {
+public interface ISeqCompactionPerformer extends IInnerCompactionPerformer {
@Override
void setSourceFiles(List<TsFileResource> seqFiles);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java
index 9c9e11538b1..686ec14fe72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.List;
-public interface IUnseqCompactionPerformer extends ICompactionPerformer {
+public interface IUnseqCompactionPerformer extends IInnerCompactionPerformer {
@Override
void setSourceFiles(List<TsFileResource> unseqFiles);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index fc8392572d2..b8347c5b91e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -39,6 +39,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -61,6 +65,7 @@ import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -364,4 +369,20 @@ public class FastCompactionPerformer
modificationCache.put(resource.getTsFile().getName(), modifications);
}
}
+
+ public String getDatabaseName() {
+ return !seqFiles.isEmpty()
+ ? seqFiles.get(0).getDatabaseName()
+ : unseqFiles.get(0).getDatabaseName();
+ }
+
+ @Override
+ public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
+ return Optional.of(new FastCompactionInnerCompactionEstimator());
+ }
+
+ @Override
+ public Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() {
+ return Optional.of(new FastCrossSpaceCompactionEstimator());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index 670f91961c6..f790d4e606c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -30,6 +30,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.exe
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -45,6 +47,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
private List<TsFileResource> seqFiles;
@@ -286,4 +289,9 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
public void setSourceFiles(List<TsFileResource> seqFiles) {
this.seqFiles = seqFiles;
}
+
+ @Override
+ public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
+ return Optional.of(new ReadChunkInnerCompactionEstimator());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 064cdecf841..eb868a3a714 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -38,6 +38,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointCrossCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointInnerCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -61,6 +63,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -323,4 +326,9 @@ public class ReadPointCompactionPerformer
public void setSourceFiles(List<TsFileResource> unseqFiles) {
this.unseqFiles = unseqFiles;
}
+
+ @Override
+ public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
+ return Optional.of(new RepairUnsortedFileCompactionEstimator());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index f58f2d0ef3e..8d7bbd2d14d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -74,6 +74,7 @@ public abstract class AbstractCompactionTask {
protected CompactionTaskSummary summary;
protected long serialId;
protected CompactionTaskStage taskStage;
+ protected long roughMemoryCost = -1L;
protected long memoryCost = 0L;
protected boolean recoverMemoryStatus;
@@ -256,6 +257,15 @@ public abstract class AbstractCompactionTask {
}
}
+ @TestOnly
+ public long getRoughMemoryCost() {
+ return roughMemoryCost;
+ }
+
+ public void setRoughMemoryCost(long memoryCost) {
+ this.roughMemoryCost = memoryCost;
+ }
+
public abstract long getEstimatedMemoryCost();
public abstract int getProcessedFileNum();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 25674209023..a9dfce2bad3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -28,8 +28,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
@@ -39,8 +39,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.CompactionEstimateUtils;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -674,20 +672,14 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public long getEstimatedMemoryCost() {
if (innerSpaceEstimator == null) {
- if (this.performer instanceof ReadChunkCompactionPerformer) {
- innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
- } else if (this.performer instanceof FastCompactionPerformer) {
- innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
- }
+ innerSpaceEstimator =
+ ((IInnerCompactionPerformer)
this.performer).getInnerSpaceEstimator().orElse(null);
}
if (innerSpaceEstimator != null && memoryCost == 0L) {
try {
- long roughEstimatedMemoryCost =
- innerSpaceEstimator.roughEstimateInnerCompactionMemory(
- filesView.sourceFilesInCompactionPerformer);
memoryCost =
-
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
- ? roughEstimatedMemoryCost
+
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughMemoryCost)
+ ? roughMemoryCost
: innerSpaceEstimator.estimateInnerCompactionMemory(
filesView.sourceFilesInCompactionPerformer);
} catch (CompactionSourceFileDeletedException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index d1b9f134527..6f215ab579a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -19,8 +19,12 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -169,4 +173,24 @@ public class CompactionScheduleContext {
public int getPartiallyDirtyFileNum() {
return partiallyDirtyFileNum;
}
+
+ public ISeqCompactionPerformer getSeqCompactionPerformer() {
+ return IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerSeqCompactionPerformer()
+ .createInstance();
+ }
+
+ public IUnseqCompactionPerformer getUnseqCompactionPerformer() {
+ IUnseqCompactionPerformer unseqCompactionPerformer =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerUnseqCompactionPerformer()
+ .createInstance();
+ return unseqCompactionPerformer;
+ }
+
+ public ICrossCompactionPerformer getCrossCompactionPerformer() {
+ return
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index f2c8e789ab1..4f762ad4306 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -301,10 +301,7 @@ public class CompactionScheduler {
tsFileManager,
taskList.get(i).getSeqFiles(),
taskList.get(i).getUnseqFiles(),
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCrossCompactionPerformer()
- .createInstance(),
+ context.getCrossCompactionPerformer(),
memoryCost.get(i),
tsFileManager.getNextCompactionTaskId());
task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
index fe73090b13c..ba6bfa9722b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -19,10 +19,13 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
@@ -36,7 +39,8 @@ import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
-import java.io.File;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -55,14 +59,40 @@ import java.util.stream.Collectors;
@SuppressWarnings("OptionalGetWithoutIsPresent")
public abstract class AbstractCompactionEstimator {
- private static final Map<File, FileInfo>
globalFileInfoCacheForFailedCompaction =
- Collections.synchronizedMap(
- new LRUMap<>(
-
IoTDBDescriptor.getInstance().getConfig().getGlobalCompactionFileInfoCacheSize()));
+ /** The size of global compaction estimation file info cahce. */
+ private static int globalCompactionFileInfoCacheSize = 1000;
+
+ /** The size of global compaction estimation rough file info cahce. */
+ private static int globalCompactionRoughFileInfoCacheSize = 100000;
+
+ private static final double maxRatioToAllocateFileInfoCache = 0.1;
+ private static boolean globalFileInfoCacheEnabled;
+ private static Map<TsFileID, FileInfo>
globalFileInfoCacheForFailedCompaction;
+ private static Map<TsFileID, FileInfo.RoughFileInfo>
globalRoughInfoCacheForCompaction;
+
+ protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ public static long allocateMemoryCostForFileInfoCache(long
compactionMemorySize) {
+ long fixedMemoryCost =
+ globalCompactionFileInfoCacheSize *
FileInfo.MEMORY_COST_OF_FILE_INFO_ENTRY_IN_CACHE
+ + globalCompactionRoughFileInfoCacheSize
+ * FileInfo.MEMORY_COST_OF_ROUGH_FILE_INFO_ENTRY_IN_CACHE;
+ globalFileInfoCacheEnabled =
+ compactionMemorySize * maxRatioToAllocateFileInfoCache >
fixedMemoryCost;
+ if (globalFileInfoCacheEnabled) {
+ globalRoughInfoCacheForCompaction = new
LRUMap<>(globalCompactionFileInfoCacheSize);
+ globalFileInfoCacheForFailedCompaction = new
LRUMap<>(globalCompactionRoughFileInfoCacheSize);
+ } else {
+ globalRoughInfoCacheForCompaction = Collections.emptyMap();
+ globalFileInfoCacheForFailedCompaction = Collections.emptyMap();
+ }
+ return globalFileInfoCacheEnabled ? fixedMemoryCost : 0;
+ }
+
protected Map<TsFileResource, FileInfo> fileInfoCache = new HashMap<>();
+ protected Map<TsFileResource, FileInfo.RoughFileInfo> roughInfoMap = new
HashMap<>();
protected Map<TsFileResource, DeviceTimeIndex> deviceTimeIndexCache = new
HashMap<>();
- protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
protected TSFileConfig tsFileConfig =
TSFileDescriptor.getInstance().getConfig();
protected long fixedMemoryBudget =
(long)
@@ -100,10 +130,10 @@ public abstract class AbstractCompactionEstimator {
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
- File file = new File(resource.getTsFilePath());
+ TsFileID tsFileID = resource.getTsFileID();
synchronized (globalFileInfoCacheForFailedCompaction) {
- if (globalFileInfoCacheForFailedCompaction.containsKey(file)) {
- FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(file);
+ FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(tsFileID);
+ if (fileInfo != null) {
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
@@ -111,19 +141,26 @@ public abstract class AbstractCompactionEstimator {
try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
- synchronized (globalFileInfoCacheForFailedCompaction) {
- globalFileInfoCacheForFailedCompaction.put(file, fileInfo);
+ if (globalFileInfoCacheEnabled) {
+ synchronized (globalFileInfoCacheForFailedCompaction) {
+ globalFileInfoCacheForFailedCompaction.put(tsFileID, fileInfo);
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ globalRoughInfoCacheForCompaction.put(tsFileID,
fileInfo.getSimpleFileInfo());
+ }
}
return fileInfo;
}
}
- protected int
calculatingMaxOverlapFileNumInSubCompactionTask(List<TsFileResource> resources)
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ protected int calculatingMaxOverlapFileNumInSubCompactionTask(
+ @Nullable CompactionScheduleContext context, List<TsFileResource>
resources)
throws IOException {
Set<IDeviceID> devices = new HashSet<>();
List<DeviceTimeIndex> resourceDevices = new ArrayList<>(resources.size());
for (TsFileResource resource : resources) {
- DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(resource);
+ DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(context,
resource);
devices.addAll(deviceTimeIndex.getDevices());
resourceDevices.add(deviceTimeIndex);
}
@@ -166,10 +203,18 @@ public abstract class AbstractCompactionEstimator {
return maxOverlapFileNumInSubCompactionTask;
}
- private DeviceTimeIndex getDeviceTimeIndexFromCache(TsFileResource resource)
throws IOException {
+ private DeviceTimeIndex getDeviceTimeIndexFromCache(
+ @Nullable CompactionScheduleContext context, TsFileResource resource)
throws IOException {
if (deviceTimeIndexCache.containsKey(resource)) {
return deviceTimeIndexCache.get(resource);
}
+ if (context != null) {
+ DeviceTimeIndex timeIndex = context.getResourceDeviceInfo(resource);
+ if (timeIndex != null) {
+ deviceTimeIndexCache.put(resource, timeIndex);
+ return timeIndex;
+ }
+ }
ITimeIndex timeIndex = resource.getTimeIndex();
if (timeIndex instanceof FileTimeIndex) {
timeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
@@ -183,12 +228,55 @@ public abstract class AbstractCompactionEstimator {
fileInfoCache.clear();
}
+ public boolean hasCachedRoughFileInfo(TsFileResource resource) {
+ return getRoughFileInfo(resource) != null;
+ }
+
+ public FileInfo.RoughFileInfo getRoughFileInfo(TsFileResource resource) {
+ FileInfo.RoughFileInfo roughFileInfo = roughInfoMap.get(resource);
+ if (roughFileInfo != null) {
+ return roughFileInfo;
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ roughFileInfo =
globalRoughInfoCacheForCompaction.get(resource.getTsFileID());
+ }
+ if (roughFileInfo != null) {
+ roughInfoMap.put(resource, roughFileInfo);
+ }
+ return roughFileInfo;
+ }
+
public static void removeFileInfoFromGlobalFileInfoCache(TsFileResource
resource) {
if (resource == null || resource.getTsFile() == null) {
return;
}
- synchronized (globalFileInfoCacheForFailedCompaction) {
- globalFileInfoCacheForFailedCompaction.remove(resource.getTsFile());
+ if (globalFileInfoCacheEnabled) {
+ synchronized (globalFileInfoCacheForFailedCompaction) {
+ globalFileInfoCacheForFailedCompaction.remove(resource.getTsFileID());
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ globalRoughInfoCacheForCompaction.remove(resource.getTsFileID());
+ }
}
}
+
+ @TestOnly
+ public static void enableFileInfoCacheForTest(
+ int globalCompactionFileInfoCacheSize, int
globalCompactionRoughFileInfoCacheSize) {
+ globalFileInfoCacheEnabled = true;
+ globalRoughInfoCacheForCompaction = new
LRUMap<>(globalCompactionFileInfoCacheSize);
+ globalFileInfoCacheForFailedCompaction = new
LRUMap<>(globalCompactionRoughFileInfoCacheSize);
+ }
+
+ @TestOnly
+ public static void disableFileInfoCacheForTest() {
+ globalFileInfoCacheEnabled = false;
+ globalRoughInfoCacheForCompaction = Collections.emptyMap();
+ globalFileInfoCacheForFailedCompaction = Collections.emptyMap();
+ }
+
+ @TestOnly
+ public static boolean isGlobalFileInfoCacheEnabled() {
+ return globalFileInfoCacheEnabled;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
index 3b6db3127b1..76d52c6ed75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
@@ -20,6 +20,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -59,5 +60,8 @@ public abstract class AbstractCrossSpaceEstimator extends
AbstractCompactionEsti
}
public abstract long roughEstimateCrossCompactionMemory(
- List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException;
+ CompactionScheduleContext context,
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unseqResources)
+ throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
index 21288883ae8..40912afdffe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimat
import org.apache.iotdb.commons.conf.IoTDBConstant;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -57,6 +58,6 @@ public abstract class AbstractInnerSpaceEstimator extends
AbstractCompactionEsti
return cost;
}
- public abstract long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
- throws IOException;
+ public abstract long roughEstimateInnerCompactionMemory(
+ CompactionScheduleContext context, List<TsFileResource> resources)
throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
index c332f72dc54..1027f49d654 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
@@ -26,12 +27,18 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.statistics.BinaryStatistics;
+import org.apache.tsfile.file.metadata.statistics.StringStatistics;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@@ -41,6 +48,9 @@ import java.util.Map;
public class CompactionEstimateUtils {
+ protected static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
/**
* Get the details of the tsfile, the returned array contains the following
elements in sequence:
*
@@ -55,12 +65,15 @@ public class CompactionEstimateUtils {
*
* @throws IOException if io errors occurred
*/
- public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws
IOException {
+ static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws
IOException {
int totalChunkNum = 0;
int maxChunkNum = 0;
int maxAlignedSeriesNumInDevice = -1;
int maxDeviceChunkNum = 0;
+ long maxMemCostToReadAlignedSeriesMetadata = 0;
+ long maxMemCostToReadNonAlignedSeriesMetadata = 0;
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ long totalMetadataSize = 0;
while (deviceIterator.hasNext()) {
int deviceChunkNum = 0;
int alignedSeriesNumInDevice = 0;
@@ -68,6 +81,7 @@ public class CompactionEstimateUtils {
Pair<IDeviceID, Boolean> deviceWithIsAlignedPair = deviceIterator.next();
IDeviceID device = deviceWithIsAlignedPair.left;
boolean isAlignedDevice = deviceWithIsAlignedPair.right;
+ long memCostToReadMetadata = 0;
Iterator<Map<String, List<ChunkMetadata>>>
measurementChunkMetadataListMapIterator =
reader.getMeasurementChunkMetadataListMapIterator(device);
@@ -81,9 +95,49 @@ public class CompactionEstimateUtils {
for (Map.Entry<String, List<ChunkMetadata>>
measurementChunkMetadataList :
measurementChunkMetadataListMap.entrySet()) {
int currentChunkMetadataListSize =
measurementChunkMetadataList.getValue().size();
+ long measurementNameRamSize =
+ RamUsageEstimator.sizeOf(measurementChunkMetadataList.getKey());
+ long chunkMetadataMemCost = 0;
+ long currentSeriesRamSize = measurementNameRamSize;
+ for (ChunkMetadata chunkMetadata :
measurementChunkMetadataList.getValue()) {
+ // chunkMetadata should not be a null value
+ if (chunkMetadata != null) {
+ TSDataType dataType = chunkMetadata.getDataType();
+ chunkMetadataMemCost =
+ chunkMetadataMemCost != 0
+ ? chunkMetadataMemCost
+ :
(ChunkMetadata.calculateRamSize(chunkMetadata.getMeasurementUid(), dataType)
+ - measurementNameRamSize);
+ if (dataType == TSDataType.TEXT) {
+ // add ram size for first value and last value
+ currentSeriesRamSize +=
+ chunkMetadata.getStatistics().getRetainedSizeInBytes()
+ - BinaryStatistics.INSTANCE_SIZE;
+ } else if (dataType == TSDataType.STRING) {
+ currentSeriesRamSize +=
+ chunkMetadata.getStatistics().getRetainedSizeInBytes()
+ - StringStatistics.INSTANCE_SIZE;
+ } else {
+ break;
+ }
+ } else {
+ LOGGER.warn(
+ "{} has null chunk metadata, file is {}",
+ device.toString() + "." +
measurementChunkMetadataList.getKey(),
+ reader.getFileName());
+ }
+ }
+ currentSeriesRamSize += chunkMetadataMemCost *
currentChunkMetadataListSize;
+ if (isAlignedDevice) {
+ memCostToReadMetadata += currentSeriesRamSize;
+ } else {
+ maxMemCostToReadNonAlignedSeriesMetadata =
+ Math.max(maxMemCostToReadNonAlignedSeriesMetadata,
currentSeriesRamSize);
+ }
deviceChunkNum += currentChunkMetadataListSize;
totalChunkNum += currentChunkMetadataListSize;
maxChunkNum = Math.max(maxChunkNum, currentChunkMetadataListSize);
+ totalMetadataSize += currentSeriesRamSize;
}
}
if (isAlignedDevice) {
@@ -91,21 +145,24 @@ public class CompactionEstimateUtils {
Math.max(maxAlignedSeriesNumInDevice, alignedSeriesNumInDevice);
}
maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum);
+ maxMemCostToReadAlignedSeriesMetadata =
+ Math.max(maxMemCostToReadAlignedSeriesMetadata,
memCostToReadMetadata);
}
- long averageChunkMetadataSize =
- totalChunkNum == 0 ? 0 : reader.getAllMetadataSize() / totalChunkNum;
+ long averageChunkMetadataSize = totalChunkNum == 0 ? 0 : totalMetadataSize
/ totalChunkNum;
return new FileInfo(
totalChunkNum,
maxChunkNum,
maxAlignedSeriesNumInDevice,
maxDeviceChunkNum,
- averageChunkMetadataSize);
+ averageChunkMetadataSize,
+ maxMemCostToReadAlignedSeriesMetadata,
+ maxMemCostToReadNonAlignedSeriesMetadata);
}
- static MetadataInfo collectMetadataInfo(List<TsFileResource> resources,
CompactionType taskType)
- throws IOException {
+ static CompactionTaskMetadataInfo collectMetadataInfoFromDisk(
+ List<TsFileResource> resources, CompactionType taskType) throws
IOException {
CompactionEstimateUtils.addReadLock(resources);
- MetadataInfo metadataInfo = new MetadataInfo();
+ CompactionTaskMetadataInfo metadataInfo = new CompactionTaskMetadataInfo();
long cost = 0L;
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
try {
@@ -129,8 +186,31 @@ public class CompactionEstimateUtils {
}
}
+ static CompactionTaskMetadataInfo collectMetadataInfoFromCachedFileInfo(
+ List<TsFileResource> resources,
+ Map<TsFileResource, FileInfo.RoughFileInfo> cachedFileInfo,
+ boolean hasConcurrentSubTask) {
+ CompactionTaskMetadataInfo metadataInfo = new CompactionTaskMetadataInfo();
+ for (TsFileResource resource : resources) {
+ metadataInfo.metadataMemCost += resource.getModFile().getSize();
+ long maxMemToReadAlignedSeries =
cachedFileInfo.get(resource).maxMemToReadAlignedSeries;
+ long maxMemToReadNonAlignedSeries =
cachedFileInfo.get(resource).maxMemToReadNonAlignedSeries;
+ metadataInfo.metadataMemCost +=
+ Math.max(
+ maxMemToReadAlignedSeries,
+ maxMemToReadNonAlignedSeries
+ * (hasConcurrentSubTask
+ ?
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum()
+ : 1));
+ if (maxMemToReadAlignedSeries > 0) {
+ metadataInfo.hasAlignedSeries = true;
+ }
+ }
+ return metadataInfo;
+ }
+
static Map<IDeviceID, Long> getDeviceMetadataSizeMapAndCollectMetadataInfo(
- CompactionTsFileReader reader, MetadataInfo metadataInfo) throws
IOException {
+ CompactionTsFileReader reader, CompactionTaskMetadataInfo metadataInfo)
throws IOException {
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
index 32803fb944f..7328d05ea54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
@@ -34,7 +34,6 @@ public class CompactionTaskInfo {
private long modificationFileSize = 0;
private long totalFileSize = 0;
private long totalChunkNum = 0;
- private long totalChunkMetadataSize = 0;
protected CompactionTaskInfo(List<TsFileResource> resources, List<FileInfo>
fileInfoList) {
this.fileInfoList = fileInfoList;
@@ -55,7 +54,6 @@ public class CompactionTaskInfo {
Math.max(maxChunkMetadataNumInDevice, fileInfo.maxDeviceChunkNum);
maxChunkMetadataSize = Math.max(maxChunkMetadataSize,
fileInfo.averageChunkMetadataSize);
totalChunkNum += fileInfo.totalChunkNum;
- totalChunkMetadataSize += fileInfo.totalChunkNum *
fileInfo.averageChunkMetadataSize;
}
}
@@ -94,8 +92,4 @@ public class CompactionTaskInfo {
public List<TsFileResource> getResources() {
return resources;
}
-
- public long getTotalChunkMetadataSize() {
- return totalChunkMetadataSize;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java
similarity index 81%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java
index a6474a599f4..ac72978b7dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java
@@ -21,11 +21,15 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimat
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-class MetadataInfo {
+class CompactionTaskMetadataInfo {
public long metadataMemCost;
public boolean hasAlignedSeries;
- public int getMaxConcurrentSeriesNum() {
+ public int getMaxConcurrentSeriesNum(boolean hasConcurrentSubTask) {
+ int subTaskNum =
+ hasConcurrentSubTask
+ ?
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum()
+ : 1;
if (!hasAlignedSeries) {
return
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
}
@@ -35,8 +39,6 @@ class MetadataInfo {
compactionMaxAlignedSeriesNumInOneBatch <= 0
? Integer.MAX_VALUE
: compactionMaxAlignedSeriesNumInOneBatch;
- return Math.max(
- compactionMaxAlignedSeriesNumInOneBatch,
- IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+ return Math.max(compactionMaxAlignedSeriesNumInOneBatch, subTaskNum);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
index d7b3933b38d..d69c0b38f4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
@@ -19,9 +19,11 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.List;
@@ -31,9 +33,19 @@ public class FastCompactionInnerCompactionEstimator extends
AbstractInnerSpaceEs
public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
+ long maxAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries)
+ .sum();
+ long maxNonAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(
+ fileInfo ->
+ fileInfo.maxMemToReadNonAlignedSeries *
config.getSubCompactionTaskNum())
+ .sum();
cost +=
Math.min(
- taskInfo.getTotalChunkMetadataSize(),
+ Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
@@ -71,7 +83,7 @@ public class FastCompactionInnerCompactionEstimator extends
AbstractInnerSpaceEs
long maxConcurrentChunkSizeFromSourceFile =
(averageChunkSize + tsFileConfig.getPageSizeInByte())
* maxConcurrentSeriesNum
- *
calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources());
+ * calculatingMaxOverlapFileNumInSubCompactionTask(null,
taskInfo.getResources());
return targetChunkWriterSize
+ maxConcurrentChunkSizeFromSourceFile
@@ -79,25 +91,33 @@ public class FastCompactionInnerCompactionEstimator extends
AbstractInnerSpaceEs
}
@Override
- public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
+ public long roughEstimateInnerCompactionMemory(
+ @Nullable CompactionScheduleContext context, List<TsFileResource>
resources)
throws IOException {
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
- MetadataInfo metadataInfo =
- CompactionEstimateUtils.collectMetadataInfo(
- resources,
- resources.get(0).isSeq()
- ? CompactionType.INNER_SEQ_COMPACTION
- : CompactionType.INNER_UNSEQ_COMPACTION);
- int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
+ CompactionTaskMetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfoFromCachedFileInfo(
+ resources, roughInfoMap, true);
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(true);
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
- int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(resources);
+ int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(context, resources);
// source files (chunk + uncompressed page) * overlap file num
// target file (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize +
maxPageSize)
+ fixedMemoryBudget
+ metadataInfo.metadataMemCost;
}
+
+ @Override
+ protected int calculatingMaxOverlapFileNumInSubCompactionTask(
+ @Nullable CompactionScheduleContext context, List<TsFileResource>
resources)
+ throws IOException {
+ if (resources.get(0).isSeq()) {
+ return 1;
+ }
+ return super.calculatingMaxOverlapFileNumInSubCompactionTask(context,
resources);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
index 97f3aef7a8a..aaa9099bd7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -32,9 +33,19 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
protected long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
+ long maxAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries)
+ .sum();
+ long maxNonAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(
+ fileInfo ->
+ fileInfo.maxMemToReadNonAlignedSeries *
config.getSubCompactionTaskNum())
+ .sum();
cost +=
Math.min(
- taskInfo.getTotalChunkMetadataSize(),
+ Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
@@ -73,7 +84,7 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
long maxConcurrentChunkSizeFromSourceFile =
(averageChunkSize + tsFileConfig.getPageSizeInByte())
* maxConcurrentSeriesNum
- *
calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources());
+ * calculatingMaxOverlapFileNumInSubCompactionTask(null,
taskInfo.getResources());
return targetChunkWriterSize
+ maxConcurrentChunkSizeFromSourceFile
@@ -82,7 +93,10 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
@Override
public long roughEstimateCrossCompactionMemory(
- List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
+ CompactionScheduleContext context,
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unseqResources)
+ throws IOException {
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
@@ -90,13 +104,14 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
sourceFiles.addAll(seqResources);
sourceFiles.addAll(unseqResources);
- MetadataInfo metadataInfo =
- CompactionEstimateUtils.collectMetadataInfo(sourceFiles,
CompactionType.CROSS_COMPACTION);
+ CompactionTaskMetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfoFromDisk(
+ sourceFiles, CompactionType.CROSS_COMPACTION);
- int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(true);
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
- int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(sourceFiles);
+ int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(context, sourceFiles);
// source files (chunk + uncompressed page) * overlap file num
// target files (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize +
maxPageSize)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java
index 09282cc0cdc..b3282732898 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java
@@ -19,7 +19,23 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.Map;
+
public class FileInfo {
+ public static final long MEMORY_COST_OF_FILE_INFO_ENTRY_IN_CACHE =
+ RamUsageEstimator.shallowSizeOfInstance(FileInfo.class)
+ + RamUsageEstimator.shallowSizeOfInstance(TsFileID.class)
+ + RamUsageEstimator.shallowSizeOfInstance(Map.Entry.class)
+ + RamUsageEstimator.NUM_BYTES_OBJECT_REF * 2L;
+ public static final long MEMORY_COST_OF_ROUGH_FILE_INFO_ENTRY_IN_CACHE =
+ RamUsageEstimator.shallowSizeOfInstance(RoughFileInfo.class)
+ + RamUsageEstimator.shallowSizeOfInstance(TsFileID.class)
+ + RamUsageEstimator.shallowSizeOfInstance(Map.Entry.class)
+ + RamUsageEstimator.NUM_BYTES_OBJECT_REF * 2L;
// total chunk num in this tsfile
int totalChunkNum = 0;
// max chunk num of one timeseries in this tsfile
@@ -34,16 +50,37 @@ public class FileInfo {
long averageChunkMetadataSize = 0;
+ long maxMemToReadAlignedSeries;
+ long maxMemToReadNonAlignedSeries;
+
public FileInfo(
int totalChunkNum,
int maxSeriesChunkNum,
int maxAlignedSeriesNumInDevice,
int maxDeviceChunkNum,
- long averageChunkMetadataSize) {
+ long averageChunkMetadataSize,
+ long maxMemToReadAlignedSeries,
+ long maxMemToReadNonAlignedSeries) {
this.totalChunkNum = totalChunkNum;
this.maxSeriesChunkNum = maxSeriesChunkNum;
this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice;
this.maxDeviceChunkNum = maxDeviceChunkNum;
this.averageChunkMetadataSize = averageChunkMetadataSize;
+ this.maxMemToReadAlignedSeries = maxMemToReadAlignedSeries;
+ this.maxMemToReadNonAlignedSeries = maxMemToReadNonAlignedSeries;
+ }
+
+ public RoughFileInfo getSimpleFileInfo() {
+ return new RoughFileInfo(maxMemToReadAlignedSeries,
maxMemToReadNonAlignedSeries);
+ }
+
+ public static class RoughFileInfo {
+ long maxMemToReadAlignedSeries;
+ long maxMemToReadNonAlignedSeries;
+
+ public RoughFileInfo(long maxMemToReadAlignedSeries, long
maxMemToReadNonAlignedSeries) {
+ this.maxMemToReadAlignedSeries = maxMemToReadAlignedSeries;
+ this.maxMemToReadNonAlignedSeries = maxMemToReadNonAlignedSeries;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
index 4e3ddad6969..96b0d882fec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
@@ -19,10 +19,9 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import java.io.IOException;
import java.util.List;
public class ReadChunkInnerCompactionEstimator extends
AbstractInnerSpaceEstimator {
@@ -31,9 +30,17 @@ public class ReadChunkInnerCompactionEstimator extends
AbstractInnerSpaceEstimat
public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
+ long maxAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries)
+ .sum();
+ long maxNonAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(fileInfo -> fileInfo.maxMemToReadNonAlignedSeries)
+ .sum();
cost +=
Math.min(
- taskInfo.getTotalChunkMetadataSize(),
+ Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost),
taskInfo.getFileInfoList().size()
* taskInfo.getMaxChunkMetadataNumInDevice()
* taskInfo.getMaxChunkMetadataSize());
@@ -73,14 +80,16 @@ public class ReadChunkInnerCompactionEstimator extends
AbstractInnerSpaceEstimat
}
@Override
- public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
- throws IOException {
+ public long roughEstimateInnerCompactionMemory(
+ CompactionScheduleContext context, List<TsFileResource> resources) {
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
- MetadataInfo metadataInfo =
- CompactionEstimateUtils.collectMetadataInfo(resources,
CompactionType.INNER_SEQ_COMPACTION);
- int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
+ CompactionTaskMetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfoFromCachedFileInfo(
+ resources, roughInfoMap, false);
+
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(false);
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
// source files (chunk + uncompressed page)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java
index 3b20b57b025..eca5bb3dcb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java
@@ -20,6 +20,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -31,10 +32,22 @@ public class RepairUnsortedFileCompactionEstimator extends
AbstractInnerSpaceEst
protected long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) {
long cost = 0;
// add ChunkMetadata size of MultiTsFileDeviceIterator
+ long maxAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries)
+ .sum();
+ long maxNonAlignedSeriesMemCost =
+ taskInfo.getFileInfoList().stream()
+ .mapToLong(
+ fileInfo ->
+ fileInfo.maxMemToReadNonAlignedSeries *
config.getSubCompactionTaskNum())
+ .sum();
cost +=
Math.min(
- taskInfo.getTotalChunkMetadataSize(),
- taskInfo.getMaxChunkMetadataNumInDevice() *
taskInfo.getMaxChunkMetadataSize());
+ Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost),
+ taskInfo.getFileInfoList().size()
+ * taskInfo.getMaxChunkMetadataNumInDevice()
+ * taskInfo.getMaxChunkMetadataSize());
// add ChunkMetadata size of targetFileWriter
long sizeForFileWriter =
@@ -72,8 +85,8 @@ public class RepairUnsortedFileCompactionEstimator extends
AbstractInnerSpaceEst
}
@Override
- public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
- throws IOException {
+ public long roughEstimateInnerCompactionMemory(
+ CompactionScheduleContext context, List<TsFileResource> resources)
throws IOException {
throw new RuntimeException("unimplemented");
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
index b800a1a9815..ee24ea60748 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java
@@ -19,17 +19,23 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.TsFileResourceCandidate;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -41,6 +47,8 @@ import java.util.stream.Stream;
public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelector {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private List<TsFileResourceCandidate> tsFileResourceCandidateList = new
ArrayList<>();
private final long totalFileSizeThreshold;
// the total file num in one task can not exceed this value
@@ -150,6 +158,10 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
levelTaskSelection.endCurrentTaskSelection();
break;
}
+ if (!levelTaskSelection.canSelectMoreFilesInMemoryBudget(currentFile))
{
+ levelTaskSelection.endCurrentTaskSelection();
+ break;
+ }
levelTaskSelection.addSelectedResource(currentFile, idx);
}
levelTaskSelection.endCurrentTaskSelection();
@@ -172,8 +184,27 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
int lastSelectedFileIndex = -1;
int nextTaskStartIndex = -1;
+ boolean estimateCompactionTaskMemoryDuringSelection;
+ boolean reachMemoryLimit = false;
+ IInnerCompactionPerformer performer;
+ AbstractInnerSpaceEstimator estimator;
+ long memoryCost;
+
private InnerSpaceCompactionTaskSelection(long level) {
this.level = level;
+ resetMemoryEstimationFields();
+ }
+
+ private void resetMemoryEstimationFields() {
+ estimateCompactionTaskMemoryDuringSelection = true;
+ reachMemoryLimit = false;
+ performer =
+ sequence ? context.getSeqCompactionPerformer() :
context.getUnseqCompactionPerformer();
+ estimator = performer.getInnerSpaceEstimator().orElse(null);
+ if (estimator == null) {
+ estimateCompactionTaskMemoryDuringSelection = false;
+ }
+ memoryCost = 0;
}
private boolean haveOverlappedDevices(TsFileResourceCandidate
resourceCandidate)
@@ -214,6 +245,31 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
return currentSelectedResources.isEmpty();
}
+ private boolean canSelectMoreFilesInMemoryBudget(TsFileResourceCandidate
currentFile)
+ throws IOException {
+ // can not get enough information to estimate memory cost
+ if (!estimateCompactionTaskMemoryDuringSelection) {
+ return true;
+ }
+ if (!estimator.hasCachedRoughFileInfo(currentFile.resource)) {
+ estimateCompactionTaskMemoryDuringSelection = false;
+ return true;
+ }
+ memoryCost =
+ estimator.roughEstimateInnerCompactionMemory(
+ context,
+ Stream.concat(currentSelectedResources.stream(),
Stream.of(currentFile.resource))
+ .collect(Collectors.toList()));
+ if (memoryCost < 0) {
+ return false;
+ }
+ if (memoryCost > SystemInfo.getInstance().getMemorySizeForCompaction()) {
+ reachMemoryLimit = true;
+ return false;
+ }
+ return true;
+ }
+
private void reset() {
currentSelectedResources = new ArrayList<>();
currentSkippedResources = new ArrayList<>();
@@ -221,6 +277,7 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
lastContinuousSkippedResources = new ArrayList<>();
currentSelectedFileTotalSize = 0;
currentSkippedFileTotalSize = 0;
+ resetMemoryEstimationFields();
}
private boolean isTaskTooLarge(TsFileResourceCandidate currentFile) {
@@ -244,7 +301,11 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
long currentFileSize = resource.getTsFileSize();
if (totalFileSize + currentFileSize > singleFileSizeThreshold
|| totalFileNum + 1 > totalFileNumUpperBound
- ||
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())) {
+ ||
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())
+ // if estimateCompactionTaskMemoryDuringSelection is true, we
have used the
+ // selected files for memory estimation. To ensure consistent
results, we
+ // will not add other files for merging.
+ || estimateCompactionTaskMemoryDuringSelection) {
break;
}
currentSkippedResources.add(resource);
@@ -259,7 +320,12 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
}
boolean canCompactAllFiles =
- totalFileSize <= singleFileSizeThreshold && totalFileNum <=
totalFileNumUpperBound;
+ totalFileSize <= singleFileSizeThreshold
+ && totalFileNum <= totalFileNumUpperBound
+ // if estimateCompactionTaskMemoryDuringSelection is true, we
have used the
+ // selected files for memory estimation. To ensure consistent
results, we
+ // will not add other files for merging.
+ && !estimateCompactionTaskMemoryDuringSelection;
if (canCompactAllFiles) {
currentSelectedResources =
Stream.concat(currentSelectedResources.stream(),
currentSkippedResources.stream())
@@ -273,10 +339,14 @@ public class NewSizeTieredCompactionSelector extends
SizeTieredCompactionSelecto
boolean isSatisfied =
(currentSelectedResources.size() >= totalFileNumLowerBound
|| !isActiveTimePartition
- || currentSelectedFileTotalSize >= singleFileSizeThreshold)
+ || currentSelectedFileTotalSize >= singleFileSizeThreshold
+ || reachMemoryLimit)
&& currentSelectedResources.size() > 1;
if (isSatisfied) {
InnerSpaceCompactionTask task = createInnerSpaceCompactionTask();
+ if (estimateCompactionTaskMemoryDuringSelection) {
+ task.setRoughMemoryCost(memoryCost);
+ }
selectedTaskList.add(task);
}
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 804d441d2f2..ffad8a487da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -237,7 +237,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
long roughEstimatedMemoryCost =
compactionEstimator.roughEstimateCrossCompactionMemory(
- newSelectedSeqResources, newSelectedUnseqResources);
+ context, newSelectedSeqResources, newSelectedUnseqResources);
long memoryCost =
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
? roughEstimatedMemoryCost
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 1e692697f44..e8cd62b6d16 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.slf4j.Logger;
@@ -404,8 +405,13 @@ public class SystemInfo {
memorySizeForMemtable =
(long)
(config.getAllocateMemoryForStorageEngine() *
config.getWriteProportionForMemtable());
+
memorySizeForCompaction =
(long) (config.getAllocateMemoryForStorageEngine() *
config.getCompactionProportion());
+ long fixedMemoryCost =
+
AbstractCompactionEstimator.allocateMemoryCostForFileInfoCache(memorySizeForCompaction);
+ memorySizeForCompaction -= fixedMemoryCost;
+
memorySizeForWalBufferQueue =
(long)
(config.getAllocateMemoryForStorageEngine()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
index b208163a8ca..e96ea2653d8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
@@ -23,9 +23,13 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.NewSizeTieredCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -43,12 +47,13 @@ import java.util.List;
public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest
{
- int compactionBatchSize =
-
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+ int compactionBatchSize;
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ compactionBatchSize =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
super.setUp();
}
@@ -116,39 +121,57 @@ public class CompactionTaskMemCostEstimatorTest extends
AbstractCompactionTest {
}
@Test
- public void testEstimateWithNegativeBatchSize() throws IOException {
- TsFileResource resource = createEmptyFileAndResource(true);
- try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
- writer.startChunkGroup("d1");
- List<String> measurements = new ArrayList<>();
+ public void testRoughEstimate() throws IOException {
+ boolean cacheEnabled =
AbstractCompactionEstimator.isGlobalFileInfoCacheEnabled();
+ if (!cacheEnabled) {
+ AbstractCompactionEstimator.enableFileInfoCacheForTest(100, 100);
+ }
+ try {
for (int i = 0; i < 10; i++) {
- measurements.add("s" + i);
+ TsFileResource resource = createEmptyFileAndResource(false);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ List<String> measurements = new ArrayList<>();
+ for (int j = 0; j < 10; j++) {
+ measurements.add("s" + j);
+ }
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ measurements,
+ new TimeRange[] {new TimeRange(0, 10000)},
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ writer.endChunkGroup();
+
+ writer.startChunkGroup("d2");
+ for (int j = 0; j < 10; j++) {
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s" + j,
+ new TimeRange[] {new TimeRange(0, 10000)},
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource);
}
- writer.generateSimpleAlignedSeriesToCurrentDevice(
- measurements,
- new TimeRange[] {new TimeRange(0, 10000)},
- TSEncoding.PLAIN,
- CompressionType.UNCOMPRESSED);
- writer.endChunkGroup();
-
- writer.startChunkGroup("d2");
- for (int i = 0; i < 10; i++) {
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s" + i,
- new TimeRange[] {new TimeRange(0, 10000)},
- TSEncoding.PLAIN,
- CompressionType.UNCOMPRESSED);
+ NewSizeTieredCompactionSelector selector =
+ new NewSizeTieredCompactionSelector(
+ COMPACTION_TEST_SG, "0", 0, true, tsFileManager, new
CompactionScheduleContext());
+ List<InnerSpaceCompactionTask> innerSpaceCompactionTasks =
+ selector.selectInnerSpaceTask(seqResources);
+ Assert.assertEquals(1, innerSpaceCompactionTasks.size());
+ Assert.assertEquals(-1,
innerSpaceCompactionTasks.get(0).getRoughMemoryCost());
+ long estimatedMemoryCost =
innerSpaceCompactionTasks.get(0).getEstimatedMemoryCost();
+ Assert.assertTrue(estimatedMemoryCost > 0);
+
+ innerSpaceCompactionTasks = selector.selectInnerSpaceTask(seqResources);
+ Assert.assertEquals(1, innerSpaceCompactionTasks.size());
+ Assert.assertTrue(innerSpaceCompactionTasks.get(0).getRoughMemoryCost()
> 0);
+ } finally {
+ if (!cacheEnabled) {
+ AbstractCompactionEstimator.disableFileInfoCacheForTest();
}
- writer.endChunkGroup();
- writer.endFile();
}
- seqResources.add(resource);
-
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(-1);
- ReadChunkInnerCompactionEstimator estimator = new
ReadChunkInnerCompactionEstimator();
- long v1 = estimator.roughEstimateInnerCompactionMemory(seqResources);
- Assert.assertTrue(v1 < 0);
-
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(10);
- long v2 = estimator.roughEstimateInnerCompactionMemory(seqResources);
- Assert.assertTrue(v2 > 0);
}
}