This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d65058b35 [IOTDB-4492] Add necessary log and control total size for 
cross space compaction task (#7399)
0d65058b35 is described below

commit 0d65058b359fc0dccb0ff030e6cfc9be25c7a9d2
Author: Mrquan <[email protected]>
AuthorDate: Thu Sep 22 15:23:21 2022 +0800

    [IOTDB-4492] Add necessary log and control total size for cross space 
compaction task (#7399)
---
 .../resources/conf/iotdb-datanode.properties       |  5 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 ++
 .../compaction/cross/CrossSpaceCompactionTask.java | 17 ++++---
 .../RewriteCrossSpaceCompactionSelector.java       | 57 +++++++++++++++++++++-
 5 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties 
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 6a275d7bf3..f4cf85d05c 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -624,6 +624,11 @@ timestamp_precision=ms
 # Datatype: int
 # max_cross_compaction_candidate_file_num=1000
 
+# The max total size when selecting cross space compaction candidate files
+# At least one unseq file with it's overlapped seq files will be selected even 
exceeded this number
+# Datatype: long, Unit: byte
+# max_cross_compaction_candidate_file_size=5368709120
+
 # If one merge file selection runs for more than this time, it will be ended 
and its current
 # selection will be used as final selection.
 # When < 0, it means time is unbounded.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c277e6570e..5513ff51c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -476,6 +476,9 @@ public class IoTDBConfig {
   /** The max candidate file num in cross space compaction */
   private int maxCrossCompactionCandidateFileNum = 1000;
 
+  /** The max total size of candidate files in cross space compaction */
+  private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
+
   /** The interval of compaction task schedulation in each virtual storage 
group. The unit is ms. */
   private long compactionScheduleIntervalInMs = 60_000L;
 
@@ -2794,6 +2797,14 @@ public class IoTDBConfig {
     this.maxCrossCompactionCandidateFileNum = 
maxCrossCompactionCandidateFileNum;
   }
 
+  public long getMaxCrossCompactionCandidateFileSize() {
+    return maxCrossCompactionCandidateFileSize;
+  }
+
+  public void setMaxCrossCompactionCandidateFileSize(long 
maxCrossCompactionCandidateFileSize) {
+    this.maxCrossCompactionCandidateFileSize = 
maxCrossCompactionCandidateFileSize;
+  }
+
   public long getCompactionSubmissionIntervalInMs() {
     return compactionSubmissionIntervalInMs;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 39d590c3ec..2d0ac129d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -671,6 +671,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "max_cross_compaction_candidate_file_num",
                 
Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
+    conf.setMaxCrossCompactionCandidateFileSize(
+        Long.parseLong(
+            properties.getProperty(
+                "max_cross_compaction_candidate_file_size",
+                
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
     conf.setCompactionWriteThroughputMbPerSec(
         Integer.parseInt(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 8982220e13..650404a6fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -57,7 +57,8 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
   protected List<TsFileResource> targetTsfileResourceList;
   protected List<TsFileResource> holdReadLockList = new ArrayList<>();
   protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
-  protected long selectedFileSize = 0;
+  protected double selectedSeqFileSize = 0;
+  protected double selectedUnseqFileSize = 0;
   protected long memoryCost = 0L;
 
   public CrossSpaceCompactionTask(
@@ -112,19 +113,23 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       }
 
       for (TsFileResource resource : selectedSequenceFiles) {
-        selectedFileSize += resource.getTsFileSize();
+        selectedSeqFileSize += resource.getTsFileSize();
       }
+
       for (TsFileResource resource : selectedUnsequenceFiles) {
-        selectedFileSize += resource.getTsFileSize();
+        selectedUnseqFileSize += resource.getTsFileSize();
       }
 
       LOGGER.info(
-          "{}-{} [Compaction] CrossSpaceCompactionTask start. Sequence files : 
{}, unsequence files : {}, total size is {} MB",
+          "{}-{} [Compaction] CrossSpaceCompactionTask start. Sequence files : 
{}, unsequence files : {}, "
+              + "sequence files size is {} MB, unsequence file size is {} MB, 
total size is {}",
           storageGroupName,
           dataRegionId,
           selectedSequenceFiles,
           selectedUnsequenceFiles,
-          ((double) selectedFileSize) / 1024.0 / 1024.0);
+          selectedSeqFileSize / 1024 / 1024,
+          selectedUnseqFileSize / 1024 / 1024,
+          (selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024);
       logFile =
           new File(
               selectedSequenceFiles.get(0).getTsFile().getParent()
@@ -175,7 +180,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
             storageGroupName,
             dataRegionId,
             costTime,
-            ((double) selectedFileSize) / 1024.0d / 1024.0d / costTime);
+            (selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024 / 
costTime);
       }
     } catch (Throwable throwable) {
       // catch throwable to handle OOM errors
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index 35d4eb7653..b314757ef8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.engine.compaction.cross.ICrossSpaceSelector;
 import 
org.apache.iotdb.db.engine.compaction.cross.utils.AbstractCompactionEstimator;
 import org.apache.iotdb.db.engine.compaction.task.ICompactionSelector;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.rescon.SystemInfo;
@@ -46,6 +47,7 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private final int SELECT_WARN_THRESHOLD = 10;
   protected String logicalStorageGroupName;
   protected String dataRegionId;
   protected long timePartition;
@@ -54,8 +56,10 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
   private CrossSpaceCompactionResource resource;
 
   private long totalCost;
+  private long totalSize;
   private final long memoryBudget;
   private final int maxCrossCompactionFileNum;
+  private final long maxCrossCompactionFileSize;
 
   private List<TsFileResource> selectedUnseqFiles;
   private List<TsFileResource> selectedSeqFiles;
@@ -81,6 +85,9 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
             / 
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
     this.maxCrossCompactionFileNum =
         
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
+    this.maxCrossCompactionFileSize =
+        
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize();
+
     this.compactionEstimator =
         ICompactionSelector.getCompactionEstimator(
             
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
@@ -156,6 +163,7 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
     selectedUnseqFiles = new ArrayList<>();
 
     totalCost = 0;
+    totalSize = 0;
 
     int unseqIndex = 0;
     long startTime = System.currentTimeMillis();
@@ -190,8 +198,12 @@ public class RewriteCrossSpaceCompactionSelector 
implements ICrossSpaceSelector
 
       List<TsFileResource> tmpSelectedSeqFileResources = new ArrayList<>();
       for (int seqIndex : tmpSelectedSeqFiles) {
-        tmpSelectedSeqFileResources.add(resource.getSeqFiles().get(seqIndex));
+        TsFileResource tsFileResource = resource.getSeqFiles().get(seqIndex);
+        tmpSelectedSeqFileResources.add(tsFileResource);
+        totalSize += tsFileResource.getTsFileSize();
       }
+      totalSize += unseqFile.getTsFileSize();
+
       long newCost =
           
compactionEstimator.estimateCrossCompactionMemory(tmpSelectedSeqFileResources, 
unseqFile);
       if (!updateSelectedFiles(newCost, unseqFile)) {
@@ -214,6 +226,7 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
     if (selectedUnseqFiles.size() == 0
         || (seqSelectedNum + selectedUnseqFiles.size() + 1 + 
tmpSelectedSeqFiles.size()
                 <= maxCrossCompactionFileNum
+            && totalSize <= maxCrossCompactionFileSize
             && totalCost + newCost < memoryBudget)) {
       selectedUnseqFiles.add(unseqFile);
 
@@ -275,12 +288,32 @@ public class RewriteCrossSpaceCompactionSelector 
implements ICrossSpaceSelector
           continue;
         }
 
+        int crossSpaceCompactionTimes = 0;
+        try {
+          TsFileNameGenerator.TsFileName tsFileName =
+              TsFileNameGenerator.getTsFileName(seqFile.getTsFile().getName());
+          crossSpaceCompactionTimes = tsFileName.getCrossCompactionCnt();
+        } catch (IOException e) {
+          LOGGER.warn("Meets IOException when selecting files for cross space 
compaction", e);
+        }
+
         long seqEndTime = seqFile.getEndTime(deviceId);
         long seqStartTime = seqFile.getStartTime(deviceId);
         if (!seqFile.isClosed()) {
           // for unclosed file, only select those that overlap with the unseq 
file
           if (unseqEndTime >= seqStartTime) {
             tmpSelectedSeqFiles.add(i);
+            if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+              LOGGER.warn(
+                  "{} is selected for cross space compaction, it is overlapped 
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time 
range:[{},{}]",
+                  seqFile.getTsFile().getAbsolutePath(),
+                  unseqFile.getTsFile().getAbsolutePath(),
+                  deviceId,
+                  seqStartTime,
+                  seqEndTime,
+                  unseqStartTime,
+                  unseqEndTime);
+            }
           }
         } else if (unseqEndTime <= seqEndTime) {
           // if time range in unseq file is 10-20, seq file is 30-40, or
@@ -288,10 +321,32 @@ public class RewriteCrossSpaceCompactionSelector 
implements ICrossSpaceSelector
           // there is no more overlap later.
           tmpSelectedSeqFiles.add(i);
           noMoreOverlap = true;
+          if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+            LOGGER.warn(
+                "{} is selected for cross space compaction, it is overlapped 
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time 
range:[{},{}]",
+                seqFile.getTsFile().getAbsolutePath(),
+                unseqFile.getTsFile().getAbsolutePath(),
+                deviceId,
+                seqStartTime,
+                seqEndTime,
+                unseqStartTime,
+                unseqEndTime);
+          }
         } else if (unseqStartTime <= seqEndTime) {
           // if time range in unseq file is 10-20, seq file is 0-15, then 
select this seq file and
           // there may be overlap later.
           tmpSelectedSeqFiles.add(i);
+          if (crossSpaceCompactionTimes >= SELECT_WARN_THRESHOLD) {
+            LOGGER.warn(
+                "{} is selected for cross space compaction, it is overlapped 
with {} in device {}. Sequence file time range:[{},{}], Unsequence file time 
range:[{},{}]",
+                seqFile.getTsFile().getAbsolutePath(),
+                unseqFile.getTsFile().getAbsolutePath(),
+                deviceId,
+                seqStartTime,
+                seqEndTime,
+                unseqStartTime,
+                unseqEndTime);
+          }
         }
       }
     }

Reply via email to