This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ff13ec70640 Compaction task memory is incorrect when
compaction_max_aligned_series_num_in_one_batch <= 0 (#14603) (#14639)
ff13ec70640 is described below
commit ff13ec7064051b52662715479ebbcad1c0109096
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 10 10:25:33 2025 +0800
Compaction task memory is incorrect when
compaction_max_aligned_series_num_in_one_batch <= 0 (#14603) (#14639)
* Compaction task memory is incorrect when
compaction_max_aligned_series_num_in_one_batch <= 0
* move MetadataInfo
* add ut
* add ut & fix bug
* modify MetadataInfo
* modify test
* fix bug
* rename
* fix ut
---
.../CompactionSourceFileDeletedException.java | 32 +++++++
.../execute/task/InnerSpaceCompactionTask.java | 6 +-
.../estimator/AbstractCrossSpaceEstimator.java | 7 +-
.../estimator/AbstractInnerSpaceEstimator.java | 8 +-
.../estimator/CompactionEstimateUtils.java | 35 ++++---
.../FastCompactionInnerCompactionEstimator.java | 16 ++--
.../FastCrossSpaceCompactionEstimator.java | 17 ++--
.../selector/estimator/MetadataInfo.java | 42 +++++++++
.../ReadChunkInnerCompactionEstimator.java | 15 ++-
.../impl/RewriteCrossSpaceCompactionSelector.java | 5 +-
.../cross/CrossSpaceCompactionSelectorTest.java | 102 +--------------------
.../utils/CompactionTaskMemCostEstimatorTest.java | 48 ++++++++++
12 files changed, 176 insertions(+), 157 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
new file mode 100644
index 00000000000..5f72dd591a9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+public class CompactionSourceFileDeletedException extends RuntimeException {
+
+ public CompactionSourceFileDeletedException(String message) {
+ super(message);
+ }
+
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index e3ab9429e97..0fb4b1d851a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
@@ -685,10 +686,13 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
innerSpaceEstimator.roughEstimateInnerCompactionMemory(
filesView.sourceFilesInCompactionPerformer);
memoryCost =
-
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
+
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
? roughEstimatedMemoryCost
: innerSpaceEstimator.estimateInnerCompactionMemory(
filesView.sourceFilesInCompactionPerformer);
+ } catch (CompactionSourceFileDeletedException e) {
+ innerSpaceEstimator.cleanup();
+ return -1;
} catch (Exception e) {
if (e instanceof StopReadTsFileByInterruptException ||
Thread.interrupted()) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
index 2bf20d616da..3b6db3127b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
@@ -45,15 +45,10 @@ public abstract class AbstractCrossSpaceEstimator extends
AbstractCompactionEsti
List<TsFileResource> resources = new ArrayList<>(seqResources.size() +
unseqResources.size());
resources.addAll(seqResources);
resources.addAll(unseqResources);
- if (!CompactionEstimateUtils.addReadLock(resources)) {
- return -1L;
- }
+ CompactionEstimateUtils.addReadLock(resources);
long cost = 0;
try {
- if (!isAllSourceFileExist(resources)) {
- return -1L;
- }
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost += calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
index a05d652dc32..21288883ae8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
@@ -45,15 +45,9 @@ public abstract class AbstractInnerSpaceEstimator extends
AbstractCompactionEsti
}
public long estimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException {
- if (!CompactionEstimateUtils.addReadLock(resources)) {
- return -1L;
- }
+ CompactionEstimateUtils.addReadLock(resources);
long cost;
try {
- if (!isAllSourceFileExist(resources)) {
- return -1L;
- }
-
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost = calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
index d1e242952f4..c332f72dc54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
@@ -20,6 +20,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -101,11 +102,10 @@ public class CompactionEstimateUtils {
averageChunkMetadataSize);
}
- public static long roughEstimateMetadataCostInCompaction(
- List<TsFileResource> resources, CompactionType taskType) throws
IOException {
- if (!CompactionEstimateUtils.addReadLock(resources)) {
- return -1L;
- }
+ static MetadataInfo collectMetadataInfo(List<TsFileResource> resources,
CompactionType taskType)
+ throws IOException {
+ CompactionEstimateUtils.addReadLock(resources);
+ MetadataInfo metadataInfo = new MetadataInfo();
long cost = 0L;
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
try {
@@ -115,23 +115,29 @@ public class CompactionEstimateUtils {
}
try (CompactionTsFileReader reader =
new CompactionTsFileReader(resource.getTsFilePath(), taskType)) {
- for (Map.Entry<IDeviceID, Long> entry :
getDeviceMetadataSizeMap(reader).entrySet()) {
+ for (Map.Entry<IDeviceID, Long> entry :
+ getDeviceMetadataSizeMapAndCollectMetadataInfo(reader,
metadataInfo).entrySet()) {
deviceMetadataSizeMap.merge(entry.getKey(), entry.getValue(),
Long::sum);
}
}
}
- return cost +
deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
+ metadataInfo.metadataMemCost =
+ cost +
deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
+ return metadataInfo;
} finally {
CompactionEstimateUtils.releaseReadLock(resources);
}
}
- public static Map<IDeviceID, Long>
getDeviceMetadataSizeMap(CompactionTsFileReader reader)
- throws IOException {
+ static Map<IDeviceID, Long> getDeviceMetadataSizeMapAndCollectMetadataInfo(
+ CompactionTsFileReader reader, MetadataInfo metadataInfo) throws
IOException {
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
- IDeviceID deviceID = deviceIterator.next().getLeft();
+ Pair<IDeviceID, Boolean> deviceAlignedPair = deviceIterator.next();
+ IDeviceID deviceID = deviceAlignedPair.getLeft();
+ boolean isAligned = deviceAlignedPair.getRight();
+ metadataInfo.hasAlignedSeries |= isAligned;
MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
long totalTimeseriesMetadataSizeOfCurrentDevice = 0;
@@ -145,14 +151,15 @@ public class CompactionEstimateUtils {
return deviceMetadataSizeMap;
}
- public static boolean shouldAccurateEstimate(long roughEstimatedMemCost) {
+ public static boolean shouldUseRoughEstimatedResult(long
roughEstimatedMemCost) {
return roughEstimatedMemCost > 0
&& IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* roughEstimatedMemCost
< SystemInfo.getInstance().getMemorySizeForCompaction();
}
- public static boolean addReadLock(List<TsFileResource> resources) {
+ public static void addReadLock(List<TsFileResource> resources)
+ throws CompactionSourceFileDeletedException {
for (int i = 0; i < resources.size(); i++) {
TsFileResource resource = resources.get(i);
resource.readLock();
@@ -161,10 +168,10 @@ public class CompactionEstimateUtils {
for (int j = 0; j <= i; j++) {
resources.get(j).readUnlock();
}
- return false;
+ throw new CompactionSourceFileDeletedException(
+ "source file " + resource.getTsFilePath() + " is deleted");
}
}
- return true;
}
public static void releaseReadLock(List<TsFileResource> resources) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
index 2e6fada2758..d7b3933b38d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
@@ -81,18 +81,16 @@ public class FastCompactionInnerCompactionEstimator extends
AbstractInnerSpaceEs
@Override
public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
throws IOException {
- long metadataCost =
- CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
+ if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+ return -1L;
+ }
+ MetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfo(
resources,
resources.get(0).isSeq()
? CompactionType.INNER_SEQ_COMPACTION
: CompactionType.INNER_UNSEQ_COMPACTION);
- if (metadataCost < 0) {
- return metadataCost;
- }
- int maxConcurrentSeriesNum =
- Math.max(
- config.getCompactionMaxAlignedSeriesNumInOneBatch(),
config.getSubCompactionTaskNum());
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(resources);
@@ -100,6 +98,6 @@ public class FastCompactionInnerCompactionEstimator extends
AbstractInnerSpaceEs
// target file (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize +
maxPageSize)
+ fixedMemoryBudget
- + metadataCost;
+ + metadataInfo.metadataMemCost;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
index 8fade5469c6..97f3aef7a8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
@@ -83,20 +83,17 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
@Override
public long roughEstimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
+ if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+ return -1L;
+ }
List<TsFileResource> sourceFiles = new ArrayList<>(seqResources.size() +
unseqResources.size());
sourceFiles.addAll(seqResources);
sourceFiles.addAll(unseqResources);
- long metadataCost =
- CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
- sourceFiles, CompactionType.CROSS_COMPACTION);
- if (metadataCost < 0) {
- return metadataCost;
- }
+ MetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfo(sourceFiles,
CompactionType.CROSS_COMPACTION);
- int maxConcurrentSeriesNum =
- Math.max(
- config.getCompactionMaxAlignedSeriesNumInOneBatch(),
config.getSubCompactionTaskNum());
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
int maxOverlapFileNum =
calculatingMaxOverlapFileNumInSubCompactionTask(sourceFiles);
@@ -104,6 +101,6 @@ public class FastCrossSpaceCompactionEstimator extends
AbstractCrossSpaceEstimat
// target files (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize +
maxPageSize)
+ fixedMemoryBudget
- + metadataCost;
+ + metadataInfo.metadataMemCost;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
new file mode 100644
index 00000000000..a6474a599f4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+class MetadataInfo {
+ public long metadataMemCost;
+ public boolean hasAlignedSeries;
+
+ public int getMaxConcurrentSeriesNum() {
+ if (!hasAlignedSeries) {
+ return
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+ }
+ int compactionMaxAlignedSeriesNumInOneBatch =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+ compactionMaxAlignedSeriesNumInOneBatch =
+ compactionMaxAlignedSeriesNumInOneBatch <= 0
+ ? Integer.MAX_VALUE
+ : compactionMaxAlignedSeriesNumInOneBatch;
+ return Math.max(
+ compactionMaxAlignedSeriesNumInOneBatch,
+ IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
index a0a396eff11..4e3ddad6969 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
@@ -75,21 +75,18 @@ public class ReadChunkInnerCompactionEstimator extends
AbstractInnerSpaceEstimat
@Override
public long roughEstimateInnerCompactionMemory(List<TsFileResource>
resources)
throws IOException {
- long metadataCost =
- CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
- resources, CompactionType.INNER_SEQ_COMPACTION);
- if (metadataCost < 0) {
- return metadataCost;
+ if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+ return -1L;
}
- int maxConcurrentSeriesNum =
- Math.max(
- config.getCompactionMaxAlignedSeriesNumInOneBatch(),
config.getSubCompactionTaskNum());
+ MetadataInfo metadataInfo =
+ CompactionEstimateUtils.collectMetadataInfo(resources,
CompactionType.INNER_SEQ_COMPACTION);
+ int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
// source files (chunk + uncompressed page)
// target file (chunk + unsealed page writer)
return 2 * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize)
+ fixedMemoryBudget
- + metadataCost;
+ + metadataInfo.metadataMemCost;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 0ba247c1b6b..ef1fe4e0208 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -140,6 +141,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
candidate.getUnseqFiles().size());
return executeTaskResourceSelection(candidate);
+ } catch (CompactionSourceFileDeletedException e) {
+ return new CrossCompactionTaskResource();
} catch (Exception e) {
if (e instanceof StopReadTsFileByInterruptException ||
Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -227,7 +230,7 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
compactionEstimator.roughEstimateCrossCompactionMemory(
newSelectedSeqResources, newSelectedUnseqResources);
long memoryCost =
-
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
+
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
? roughEstimatedMemoryCost
: compactionEstimator.estimateCrossCompactionMemory(
newSelectedSeqResources, newSelectedUnseqResources);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
index 894de73da95..02408e24c80 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
@@ -349,56 +349,7 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd2.await();
CrossCompactionTaskResource crossCompactionTaskResource =
selector.selectOneTaskResources(candidate);
- if (!crossCompactionTaskResource.isValid()) {
- throw new RuntimeException("compaction task resource is not
valid");
- }
- if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
- throw new RuntimeException("selected seq file should be 1");
- }
- if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
- throw new RuntimeException("selected unseq file num should
be 1");
- }
-
- CrossSpaceCompactionTask crossSpaceCompactionTask =
- new CrossSpaceCompactionTask(
- 0,
- tsFileManager,
- crossCompactionTaskResource.getSeqFiles(),
- crossCompactionTaskResource.getUnseqFiles(),
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCrossCompactionPerformer()
- .createInstance(),
- crossCompactionTaskResource.getTotalMemoryCost(),
- tsFileManager.getNextCompactionTaskId());
- // set file status to COMPACTION_CANDIDATE
- if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
- throw new RuntimeException("set status should be true");
- }
- for (int i = 0; i < seqResources.size(); i++) {
- TsFileResource resource = seqResources.get(i);
- if (i < 1) {
- if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
- throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
- }
- } else if (i == 1) {
- if (resource.getStatus() != TsFileResourceStatus.DELETED) {
- throw new RuntimeException("status should be DELETED");
- }
- } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
- throw new RuntimeException("status should be NORMAL");
- }
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- TsFileResource resource = unseqResources.get(i);
- if (i < 1) {
- if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
- throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
- }
- } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
- throw new RuntimeException("status should be NORMAL");
- }
- }
+ Assert.assertFalse(crossCompactionTaskResource.isValid());
} catch (Exception e) {
fail.set(true);
e.printStackTrace();
@@ -1181,56 +1132,7 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
CrossCompactionTaskResource crossCompactionTaskResource =
selector.selectOneTaskResources(candidate);
- if (!crossCompactionTaskResource.isValid()) {
- throw new RuntimeException("compaction task resource is not
valid");
- }
- if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
- throw new RuntimeException("selected seq file num is not 1");
- }
- if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
- throw new RuntimeException("selected unseq file num is not
1");
- }
-
- CrossSpaceCompactionTask crossSpaceCompactionTask =
- new CrossSpaceCompactionTask(
- 0,
- tsFileManager,
- crossCompactionTaskResource.getSeqFiles(),
- crossCompactionTaskResource.getUnseqFiles(),
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCrossCompactionPerformer()
- .createInstance(),
- crossCompactionTaskResource.getTotalMemoryCost(),
- tsFileManager.getNextCompactionTaskId());
- // set file status to COMPACTION_CANDIDATE
- if
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
- throw new RuntimeException("set status should be true");
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- TsFileResource resource = unseqResources.get(i);
- if (i < 1) {
- if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
- throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
- }
- } else if (i == 1) {
- if (resource.getStatus() != TsFileResourceStatus.DELETED) {
- throw new RuntimeException("status should be DELETED");
- }
- } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
- throw new RuntimeException("status should be NORMAL");
- }
- }
- for (int i = 0; i < seqResources.size(); i++) {
- TsFileResource resource = seqResources.get(i);
- if (i < 1) {
- if (resource.getStatus() !=
TsFileResourceStatus.COMPACTION_CANDIDATE) {
- throw new RuntimeException("status should be
COMPACTION_CANDIDATE");
- }
- } else if (resource.getStatus() !=
TsFileResourceStatus.NORMAL) {
- throw new RuntimeException("status should be NORMAL");
- }
- }
+ Assert.assertFalse(crossCompactionTaskResource.isValid());
} catch (Exception e) {
fail.set(true);
e.printStackTrace();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
index c7ad129bfa4..b208163a8ca 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.utils;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
@@ -28,16 +29,23 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimato
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest
{
+ int compactionBatchSize =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
@@ -46,6 +54,9 @@ public class CompactionTaskMemCostEstimatorTest extends
AbstractCompactionTest {
@After
public void tearDown() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionMaxAlignedSeriesNumInOneBatch(compactionBatchSize);
super.tearDown();
}
@@ -103,4 +114,41 @@ public class CompactionTaskMemCostEstimatorTest extends
AbstractCompactionTest {
.estimateCrossCompactionMemory(seqResources, unseqResources);
Assert.assertTrue(cost > 0);
}
+
+ @Test
+ public void testEstimateWithNegativeBatchSize() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ List<String> measurements = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ measurements.add("s" + i);
+ }
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ measurements,
+ new TimeRange[] {new TimeRange(0, 10000)},
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ writer.endChunkGroup();
+
+ writer.startChunkGroup("d2");
+ for (int i = 0; i < 10; i++) {
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s" + i,
+ new TimeRange[] {new TimeRange(0, 10000)},
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource);
+
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(-1);
+ ReadChunkInnerCompactionEstimator estimator = new
ReadChunkInnerCompactionEstimator();
+ long v1 = estimator.roughEstimateInnerCompactionMemory(seqResources);
+ Assert.assertTrue(v1 < 0);
+
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(10);
+ long v2 = estimator.roughEstimateInnerCompactionMemory(seqResources);
+ Assert.assertTrue(v2 > 0);
+ }
}