This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ff13ec70640 Compaction task memory is incorrect when 
compaction_max_aligned_series_num_in_one_batch <= 0 (#14603) (#14639)
ff13ec70640 is described below

commit ff13ec7064051b52662715479ebbcad1c0109096
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 10 10:25:33 2025 +0800

    Compaction task memory is incorrect when 
compaction_max_aligned_series_num_in_one_batch <= 0 (#14603) (#14639)
    
    * Compaction task memory is incorrect when 
compaction_max_aligned_series_num_in_one_batch <= 0
    
    * move MetadataInfo
    
    * add ut
    
    * add ut & fix bug
    
    * modify MetadataInfo
    
    * modify test
    
    * fix bug
    
    * rename
    
    * fix ut
---
 .../CompactionSourceFileDeletedException.java      |  32 +++++++
 .../execute/task/InnerSpaceCompactionTask.java     |   6 +-
 .../estimator/AbstractCrossSpaceEstimator.java     |   7 +-
 .../estimator/AbstractInnerSpaceEstimator.java     |   8 +-
 .../estimator/CompactionEstimateUtils.java         |  35 ++++---
 .../FastCompactionInnerCompactionEstimator.java    |  16 ++--
 .../FastCrossSpaceCompactionEstimator.java         |  17 ++--
 .../selector/estimator/MetadataInfo.java           |  42 +++++++++
 .../ReadChunkInnerCompactionEstimator.java         |  15 ++-
 .../impl/RewriteCrossSpaceCompactionSelector.java  |   5 +-
 .../cross/CrossSpaceCompactionSelectorTest.java    | 102 +--------------------
 .../utils/CompactionTaskMemCostEstimatorTest.java  |  48 ++++++++++
 12 files changed, 176 insertions(+), 157 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
new file mode 100644
index 00000000000..5f72dd591a9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionSourceFileDeletedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+public class CompactionSourceFileDeletedException extends RuntimeException {
+
+  public CompactionSourceFileDeletedException(String message) {
+    super(message);
+  }
+
+  @Override
+  public Throwable fillInStackTrace() {
+    return this;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index e3ab9429e97..0fb4b1d851a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
@@ -685,10 +686,13 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
             innerSpaceEstimator.roughEstimateInnerCompactionMemory(
                 filesView.sourceFilesInCompactionPerformer);
         memoryCost =
-            
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
+            
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
                 ? roughEstimatedMemoryCost
                 : innerSpaceEstimator.estimateInnerCompactionMemory(
                     filesView.sourceFilesInCompactionPerformer);
+      } catch (CompactionSourceFileDeletedException e) {
+        innerSpaceEstimator.cleanup();
+        return -1;
       } catch (Exception e) {
         if (e instanceof StopReadTsFileByInterruptException || 
Thread.interrupted()) {
           Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
index 2bf20d616da..3b6db3127b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java
@@ -45,15 +45,10 @@ public abstract class AbstractCrossSpaceEstimator extends 
AbstractCompactionEsti
     List<TsFileResource> resources = new ArrayList<>(seqResources.size() + 
unseqResources.size());
     resources.addAll(seqResources);
     resources.addAll(unseqResources);
-    if (!CompactionEstimateUtils.addReadLock(resources)) {
-      return -1L;
-    }
+    CompactionEstimateUtils.addReadLock(resources);
 
     long cost = 0;
     try {
-      if (!isAllSourceFileExist(resources)) {
-        return -1L;
-      }
       CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
       cost += calculatingMetadataMemoryCost(taskInfo);
       cost += calculatingDataMemoryCost(taskInfo);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
index a05d652dc32..21288883ae8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java
@@ -45,15 +45,9 @@ public abstract class AbstractInnerSpaceEstimator extends 
AbstractCompactionEsti
   }
 
   public long estimateInnerCompactionMemory(List<TsFileResource> resources) 
throws IOException {
-    if (!CompactionEstimateUtils.addReadLock(resources)) {
-      return -1L;
-    }
+    CompactionEstimateUtils.addReadLock(resources);
     long cost;
     try {
-      if (!isAllSourceFileExist(resources)) {
-        return -1L;
-      }
-
       CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
       cost = calculatingMetadataMemoryCost(taskInfo);
       cost += calculatingDataMemoryCost(taskInfo);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
index d1e242952f4..c332f72dc54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
@@ -20,6 +20,7 @@
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -101,11 +102,10 @@ public class CompactionEstimateUtils {
         averageChunkMetadataSize);
   }
 
-  public static long roughEstimateMetadataCostInCompaction(
-      List<TsFileResource> resources, CompactionType taskType) throws 
IOException {
-    if (!CompactionEstimateUtils.addReadLock(resources)) {
-      return -1L;
-    }
+  static MetadataInfo collectMetadataInfo(List<TsFileResource> resources, 
CompactionType taskType)
+      throws IOException {
+    CompactionEstimateUtils.addReadLock(resources);
+    MetadataInfo metadataInfo = new MetadataInfo();
     long cost = 0L;
     Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
     try {
@@ -115,23 +115,29 @@ public class CompactionEstimateUtils {
         }
         try (CompactionTsFileReader reader =
             new CompactionTsFileReader(resource.getTsFilePath(), taskType)) {
-          for (Map.Entry<IDeviceID, Long> entry : 
getDeviceMetadataSizeMap(reader).entrySet()) {
+          for (Map.Entry<IDeviceID, Long> entry :
+              getDeviceMetadataSizeMapAndCollectMetadataInfo(reader, 
metadataInfo).entrySet()) {
             deviceMetadataSizeMap.merge(entry.getKey(), entry.getValue(), 
Long::sum);
           }
         }
       }
-      return cost + 
deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
+      metadataInfo.metadataMemCost =
+          cost + 
deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
+      return metadataInfo;
     } finally {
       CompactionEstimateUtils.releaseReadLock(resources);
     }
   }
 
-  public static Map<IDeviceID, Long> 
getDeviceMetadataSizeMap(CompactionTsFileReader reader)
-      throws IOException {
+  static Map<IDeviceID, Long> getDeviceMetadataSizeMapAndCollectMetadataInfo(
+      CompactionTsFileReader reader, MetadataInfo metadataInfo) throws 
IOException {
     Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
     TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
     while (deviceIterator.hasNext()) {
-      IDeviceID deviceID = deviceIterator.next().getLeft();
+      Pair<IDeviceID, Boolean> deviceAlignedPair = deviceIterator.next();
+      IDeviceID deviceID = deviceAlignedPair.getLeft();
+      boolean isAligned = deviceAlignedPair.getRight();
+      metadataInfo.hasAlignedSeries |= isAligned;
       MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
           deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
       long totalTimeseriesMetadataSizeOfCurrentDevice = 0;
@@ -145,14 +151,15 @@ public class CompactionEstimateUtils {
     return deviceMetadataSizeMap;
   }
 
-  public static boolean shouldAccurateEstimate(long roughEstimatedMemCost) {
+  public static boolean shouldUseRoughEstimatedResult(long 
roughEstimatedMemCost) {
     return roughEstimatedMemCost > 0
         && IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
                 * roughEstimatedMemCost
             < SystemInfo.getInstance().getMemorySizeForCompaction();
   }
 
-  public static boolean addReadLock(List<TsFileResource> resources) {
+  public static void addReadLock(List<TsFileResource> resources)
+      throws CompactionSourceFileDeletedException {
     for (int i = 0; i < resources.size(); i++) {
       TsFileResource resource = resources.get(i);
       resource.readLock();
@@ -161,10 +168,10 @@ public class CompactionEstimateUtils {
         for (int j = 0; j <= i; j++) {
           resources.get(j).readUnlock();
         }
-        return false;
+        throw new CompactionSourceFileDeletedException(
+            "source file " + resource.getTsFilePath() + " is deleted");
       }
     }
-    return true;
   }
 
   public static void releaseReadLock(List<TsFileResource> resources) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
index 2e6fada2758..d7b3933b38d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java
@@ -81,18 +81,16 @@ public class FastCompactionInnerCompactionEstimator extends 
AbstractInnerSpaceEs
   @Override
   public long roughEstimateInnerCompactionMemory(List<TsFileResource> 
resources)
       throws IOException {
-    long metadataCost =
-        CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
+    if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+      return -1L;
+    }
+    MetadataInfo metadataInfo =
+        CompactionEstimateUtils.collectMetadataInfo(
             resources,
             resources.get(0).isSeq()
                 ? CompactionType.INNER_SEQ_COMPACTION
                 : CompactionType.INNER_UNSEQ_COMPACTION);
-    if (metadataCost < 0) {
-      return metadataCost;
-    }
-    int maxConcurrentSeriesNum =
-        Math.max(
-            config.getCompactionMaxAlignedSeriesNumInOneBatch(), 
config.getSubCompactionTaskNum());
+    int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
     long maxChunkSize = config.getTargetChunkSize();
     long maxPageSize = tsFileConfig.getPageSizeInByte();
     int maxOverlapFileNum = 
calculatingMaxOverlapFileNumInSubCompactionTask(resources);
@@ -100,6 +98,6 @@ public class FastCompactionInnerCompactionEstimator extends 
AbstractInnerSpaceEs
     // target file (chunk + unsealed page writer)
     return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + 
maxPageSize)
         + fixedMemoryBudget
-        + metadataCost;
+        + metadataInfo.metadataMemCost;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
index 8fade5469c6..97f3aef7a8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java
@@ -83,20 +83,17 @@ public class FastCrossSpaceCompactionEstimator extends 
AbstractCrossSpaceEstimat
   @Override
   public long roughEstimateCrossCompactionMemory(
       List<TsFileResource> seqResources, List<TsFileResource> unseqResources) 
throws IOException {
+    if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+      return -1L;
+    }
     List<TsFileResource> sourceFiles = new ArrayList<>(seqResources.size() + 
unseqResources.size());
     sourceFiles.addAll(seqResources);
     sourceFiles.addAll(unseqResources);
 
-    long metadataCost =
-        CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
-            sourceFiles, CompactionType.CROSS_COMPACTION);
-    if (metadataCost < 0) {
-      return metadataCost;
-    }
+    MetadataInfo metadataInfo =
+        CompactionEstimateUtils.collectMetadataInfo(sourceFiles, 
CompactionType.CROSS_COMPACTION);
 
-    int maxConcurrentSeriesNum =
-        Math.max(
-            config.getCompactionMaxAlignedSeriesNumInOneBatch(), 
config.getSubCompactionTaskNum());
+    int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
     long maxChunkSize = config.getTargetChunkSize();
     long maxPageSize = tsFileConfig.getPageSizeInByte();
     int maxOverlapFileNum = 
calculatingMaxOverlapFileNumInSubCompactionTask(sourceFiles);
@@ -104,6 +101,6 @@ public class FastCrossSpaceCompactionEstimator extends 
AbstractCrossSpaceEstimat
     // target files (chunk + unsealed page writer)
     return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + 
maxPageSize)
         + fixedMemoryBudget
-        + metadataCost;
+        + metadataInfo.metadataMemCost;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
new file mode 100644
index 00000000000..a6474a599f4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+class MetadataInfo {
+  public long metadataMemCost;
+  public boolean hasAlignedSeries;
+
+  public int getMaxConcurrentSeriesNum() {
+    if (!hasAlignedSeries) {
+      return 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+    }
+    int compactionMaxAlignedSeriesNumInOneBatch =
+        
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+    compactionMaxAlignedSeriesNumInOneBatch =
+        compactionMaxAlignedSeriesNumInOneBatch <= 0
+            ? Integer.MAX_VALUE
+            : compactionMaxAlignedSeriesNumInOneBatch;
+    return Math.max(
+        compactionMaxAlignedSeriesNumInOneBatch,
+        IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
index a0a396eff11..4e3ddad6969 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java
@@ -75,21 +75,18 @@ public class ReadChunkInnerCompactionEstimator extends 
AbstractInnerSpaceEstimat
   @Override
   public long roughEstimateInnerCompactionMemory(List<TsFileResource> 
resources)
       throws IOException {
-    long metadataCost =
-        CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
-            resources, CompactionType.INNER_SEQ_COMPACTION);
-    if (metadataCost < 0) {
-      return metadataCost;
+    if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
+      return -1L;
     }
-    int maxConcurrentSeriesNum =
-        Math.max(
-            config.getCompactionMaxAlignedSeriesNumInOneBatch(), 
config.getSubCompactionTaskNum());
+    MetadataInfo metadataInfo =
+        CompactionEstimateUtils.collectMetadataInfo(resources, 
CompactionType.INNER_SEQ_COMPACTION);
+    int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
     long maxChunkSize = config.getTargetChunkSize();
     long maxPageSize = tsFileConfig.getPageSizeInByte();
     // source files (chunk + uncompressed page)
     // target file (chunk + unsealed page writer)
     return 2 * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize)
         + fixedMemoryBudget
-        + metadataCost;
+        + metadataInfo.metadataMemCost;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 0ba247c1b6b..ef1fe4e0208 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -140,6 +141,8 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
           candidate.getUnseqFiles().size());
 
       return executeTaskResourceSelection(candidate);
+    } catch (CompactionSourceFileDeletedException e) {
+      return new CrossCompactionTaskResource();
     } catch (Exception e) {
       if (e instanceof StopReadTsFileByInterruptException || 
Thread.interrupted()) {
         Thread.currentThread().interrupt();
@@ -227,7 +230,7 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
           compactionEstimator.roughEstimateCrossCompactionMemory(
               newSelectedSeqResources, newSelectedUnseqResources);
       long memoryCost =
-          
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
+          
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
               ? roughEstimatedMemoryCost
               : compactionEstimator.estimateCrossCompactionMemory(
                   newSelectedSeqResources, newSelectedUnseqResources);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
index 894de73da95..02408e24c80 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
@@ -349,56 +349,7 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                 cd2.await();
                 CrossCompactionTaskResource crossCompactionTaskResource =
                     selector.selectOneTaskResources(candidate);
-                if (!crossCompactionTaskResource.isValid()) {
-                  throw new RuntimeException("compaction task resource is not 
valid");
-                }
-                if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
-                  throw new RuntimeException("selected seq file should be 1");
-                }
-                if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
-                  throw new RuntimeException("selected unseq file num should 
be 1");
-                }
-
-                CrossSpaceCompactionTask crossSpaceCompactionTask =
-                    new CrossSpaceCompactionTask(
-                        0,
-                        tsFileManager,
-                        crossCompactionTaskResource.getSeqFiles(),
-                        crossCompactionTaskResource.getUnseqFiles(),
-                        IoTDBDescriptor.getInstance()
-                            .getConfig()
-                            .getCrossCompactionPerformer()
-                            .createInstance(),
-                        crossCompactionTaskResource.getTotalMemoryCost(),
-                        tsFileManager.getNextCompactionTaskId());
-                // set file status to COMPACTION_CANDIDATE
-                if 
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
-                  throw new RuntimeException("set status should be true");
-                }
-                for (int i = 0; i < seqResources.size(); i++) {
-                  TsFileResource resource = seqResources.get(i);
-                  if (i < 1) {
-                    if (resource.getStatus() != 
TsFileResourceStatus.COMPACTION_CANDIDATE) {
-                      throw new RuntimeException("status should be 
COMPACTION_CANDIDATE");
-                    }
-                  } else if (i == 1) {
-                    if (resource.getStatus() != TsFileResourceStatus.DELETED) {
-                      throw new RuntimeException("status should be DELETED");
-                    }
-                  } else if (resource.getStatus() != 
TsFileResourceStatus.NORMAL) {
-                    throw new RuntimeException("status should be NORMAL");
-                  }
-                }
-                for (int i = 0; i < unseqResources.size(); i++) {
-                  TsFileResource resource = unseqResources.get(i);
-                  if (i < 1) {
-                    if (resource.getStatus() != 
TsFileResourceStatus.COMPACTION_CANDIDATE) {
-                      throw new RuntimeException("status should be 
COMPACTION_CANDIDATE");
-                    }
-                  } else if (resource.getStatus() != 
TsFileResourceStatus.NORMAL) {
-                    throw new RuntimeException("status should be NORMAL");
-                  }
-                }
+                Assert.assertFalse(crossCompactionTaskResource.isValid());
               } catch (Exception e) {
                 fail.set(true);
                 e.printStackTrace();
@@ -1181,56 +1132,7 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
 
                 CrossCompactionTaskResource crossCompactionTaskResource =
                     selector.selectOneTaskResources(candidate);
-                if (!crossCompactionTaskResource.isValid()) {
-                  throw new RuntimeException("compaction task resource is not 
valid");
-                }
-                if (crossCompactionTaskResource.getSeqFiles().size() != 1) {
-                  throw new RuntimeException("selected seq file num is not 1");
-                }
-                if (crossCompactionTaskResource.getUnseqFiles().size() != 1) {
-                  throw new RuntimeException("selected unseq file num is not 
1");
-                }
-
-                CrossSpaceCompactionTask crossSpaceCompactionTask =
-                    new CrossSpaceCompactionTask(
-                        0,
-                        tsFileManager,
-                        crossCompactionTaskResource.getSeqFiles(),
-                        crossCompactionTaskResource.getUnseqFiles(),
-                        IoTDBDescriptor.getInstance()
-                            .getConfig()
-                            .getCrossCompactionPerformer()
-                            .createInstance(),
-                        crossCompactionTaskResource.getTotalMemoryCost(),
-                        tsFileManager.getNextCompactionTaskId());
-                // set file status to COMPACTION_CANDIDATE
-                if 
(!crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()) {
-                  throw new RuntimeException("set status should be true");
-                }
-                for (int i = 0; i < unseqResources.size(); i++) {
-                  TsFileResource resource = unseqResources.get(i);
-                  if (i < 1) {
-                    if (resource.getStatus() != 
TsFileResourceStatus.COMPACTION_CANDIDATE) {
-                      throw new RuntimeException("status should be 
COMPACTION_CANDIDATE");
-                    }
-                  } else if (i == 1) {
-                    if (resource.getStatus() != TsFileResourceStatus.DELETED) {
-                      throw new RuntimeException("status should be DELETED");
-                    }
-                  } else if (resource.getStatus() != 
TsFileResourceStatus.NORMAL) {
-                    throw new RuntimeException("status should be NORMAL");
-                  }
-                }
-                for (int i = 0; i < seqResources.size(); i++) {
-                  TsFileResource resource = seqResources.get(i);
-                  if (i < 1) {
-                    if (resource.getStatus() != 
TsFileResourceStatus.COMPACTION_CANDIDATE) {
-                      throw new RuntimeException("status should be 
COMPACTION_CANDIDATE");
-                    }
-                  } else if (resource.getStatus() != 
TsFileResourceStatus.NORMAL) {
-                    throw new RuntimeException("status should be NORMAL");
-                  }
-                }
+                Assert.assertFalse(crossCompactionTaskResource.isValid());
               } catch (Exception e) {
                 fail.set(true);
                 e.printStackTrace();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
index c7ad129bfa4..b208163a8ca 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.utils;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
@@ -28,16 +29,23 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimato
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest 
{
 
+  int compactionBatchSize =
+      
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+
   @Before
   public void setUp()
       throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
@@ -46,6 +54,9 @@ public class CompactionTaskMemCostEstimatorTest extends 
AbstractCompactionTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCompactionMaxAlignedSeriesNumInOneBatch(compactionBatchSize);
     super.tearDown();
   }
 
@@ -103,4 +114,41 @@ public class CompactionTaskMemCostEstimatorTest extends 
AbstractCompactionTest {
             .estimateCrossCompactionMemory(seqResources, unseqResources);
     Assert.assertTrue(cost > 0);
   }
+
+  @Test
+  public void testEstimateWithNegativeBatchSize() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      List<String> measurements = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        measurements.add("s" + i);
+      }
+      writer.generateSimpleAlignedSeriesToCurrentDevice(
+          measurements,
+          new TimeRange[] {new TimeRange(0, 10000)},
+          TSEncoding.PLAIN,
+          CompressionType.UNCOMPRESSED);
+      writer.endChunkGroup();
+
+      writer.startChunkGroup("d2");
+      for (int i = 0; i < 10; i++) {
+        writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+            "s" + i,
+            new TimeRange[] {new TimeRange(0, 10000)},
+            TSEncoding.PLAIN,
+            CompressionType.UNCOMPRESSED);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    seqResources.add(resource);
+    
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(-1);
+    ReadChunkInnerCompactionEstimator estimator = new 
ReadChunkInnerCompactionEstimator();
+    long v1 = estimator.roughEstimateInnerCompactionMemory(seqResources);
+    Assert.assertTrue(v1 < 0);
+    
IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(10);
+    long v2 = estimator.roughEstimateInnerCompactionMemory(seqResources);
+    Assert.assertTrue(v2 > 0);
+  }
 }


Reply via email to