This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 185927e500 [To rel/0.13][IOTDB-5776]Update memory estimation of cross 
space compaction (#9623)
185927e500 is described below

commit 185927e500501b48ed486b34496564603a4d5e31
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Apr 19 11:36:33 2023 +0800

    [To rel/0.13][IOTDB-5776]Update memory estimation of cross space compaction 
(#9623)
---
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  15 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +-
 .../selector/AbstractCompactionEstimator.java      |   6 ++
 .../selector/RewriteCompactionFileSelector.java    |  12 ++-
 .../selector/RewriteCrossCompactionEstimator.java  | 108 +++++++++++++--------
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   3 +
 .../cross/RewriteCompactionFileSelectorTest.java   |   4 +-
 8 files changed, 111 insertions(+), 48 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index ed6a504db1..9a626ceb53 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -453,6 +453,11 @@ timestamp_precision=ms
 # BALANCE: alternate two compaction types
 # compaction_priority=BALANCE
 
+# Whether to enable compaction memory control. If true and estimated memory 
size of
+# one compaction task exceeds the threshold, system will block it. It only 
works for cross space compaction currently.
+# Datatype: boolean
+# enable_compaction_mem_control=true
+
 # size proportion for chunk metadata maintains in memory when compacting
 # Datatype: double
 # chunk_metadata_memory_size_proportion=0.1
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f97a1d2d22..449ee57d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -404,6 +404,13 @@ public class IoTDBConfig {
    */
   private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
 
+  /**
+   * Enable compaction memory control or not. If true and estimated memory 
size of one compaction
+   * task exceeds the threshold, system will block the compaction. It only 
works for cross space
+   * compaction currently.
+   */
+  private boolean enableCompactionMemControl = true;
+
   private double chunkMetadataMemorySizeProportion = 0.1;
 
   /** The target tsfile size in compaction, 1 GB by default */
@@ -2823,6 +2830,14 @@ public class IoTDBConfig {
     this.customizedProperties = customizedProperties;
   }
 
+  public boolean isEnableCompactionMemControl() {
+    return enableCompactionMemControl;
+  }
+
+  public void setEnableCompactionMemControl(boolean 
enableCompactionMemControl) {
+    this.enableCompactionMemControl = enableCompactionMemControl;
+  }
+
   public double getChunkMetadataMemorySizeProportion() {
     return chunkMetadataMemorySizeProportion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2616bbb15e..1ddb246600 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -344,7 +344,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "max_waiting_time_when_insert_blocked",
                 Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
-
+    conf.setEnableCompactionMemControl(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_compaction_mem_control",
+                Boolean.toString(conf.isEnableCompactionMemControl()))));
     conf.setChunkMetadataMemorySizeProportion(
         Double.parseDouble(
             properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
index f891b73ab5..51f410f3c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/AbstractCompactionEstimator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
@@ -34,6 +36,10 @@ public abstract class AbstractCompactionEstimator implements 
AutoCloseable {
 
   protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new 
HashMap<>();
 
+  protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  protected int compressionRatio = 5;
+
   /**
    * Estimate the memory cost of compacting the unseq file and its 
corresponding overlapped seq
    * files in cross space compaction task.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index 1f12fd5f24..3c95135dfc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -56,6 +56,9 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
   CrossSpaceCompactionResource resource;
 
   long totalCost;
+
+  private long totalUnseqFileSize;
+  private long totalSeqFileSize;
   private long memoryBudget;
   private long maxSeqFileCost;
   private int maxCrossCompactionFileNum;
@@ -145,11 +148,14 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
     }
     if (logger.isInfoEnabled()) {
       logger.info(
-          "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory 
cost {}, "
+          "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory 
cost {} MB, total file size is {} MB, total seq file size is {} MB, total unseq 
file size is {} MB, "
               + "time consumption {}ms",
           selectedSeqFiles.size(),
           selectedUnseqFiles.size(),
-          totalCost,
+          (float) totalCost / 1024 / 1024,
+          (float) (totalUnseqFileSize + totalSeqFileSize) / 1024 / 1024,
+          (float) totalSeqFileSize / 1024 / 1024,
+          (float) totalUnseqFileSize / 1024 / 1024,
           System.currentTimeMillis() - startTime);
     }
     return new List[] {selectedSeqFiles, selectedUnseqFiles};
@@ -230,6 +236,7 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
     for (int i = 0; i < seqSelected.length; i++) {
       if (seqSelected[i]) {
         selectedSeqFiles.add(resource.getSeqFiles().get(i));
+        totalSeqFileSize += resource.getSeqFiles().get(i).getTsFileSize();
       }
     }
   }
@@ -260,6 +267,7 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
             && totalSize <= maxCrossCompactionFileSize
             && totalCost + newCost < memoryBudget)) {
       selectedUnseqFiles.add(unseqFile);
+      totalUnseqFileSize += unseqFile.getTsFileSize();
       maxSeqFileCost = tempMaxSeqFileCost;
 
       for (Integer seqIdx : tmpSelectedSeqFiles) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
index 1908b1f684..49998d53a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCrossCompactionEstimator.java
@@ -23,13 +23,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -41,13 +39,10 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
   // task
   private long maxCostOfReadingSeqFile;
 
-  // left is the max chunk num in chunkgroup of unseq file, right is the total 
chunk num of unseq
-  // file.
-  private Pair<Integer, Integer> maxUnseqChunkNumInDevice;
+  // the max cost of writing target file
+  private long maxCostOfWritingTargetFile;
 
-  // it stores all chunk info of seq files. Left is the max chunk num in 
chunkgroup of seq file,
-  // right is the total chunk num of seq file.
-  private final List<Pair<Integer, Integer>> maxSeqChunkNumInDeviceList;
+  private int maxConcurrentSeriesNum = 1;
 
   // the number of timeseries being compacted at the same time
   private final int subCompactionTaskNum =
@@ -55,7 +50,7 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
 
   public RewriteCrossCompactionEstimator() {
     this.maxCostOfReadingSeqFile = 0;
-    this.maxSeqChunkNumInDeviceList = new ArrayList<>();
+    this.maxCostOfWritingTargetFile = 0;
   }
 
   @Override
@@ -65,7 +60,6 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
     cost += calculateReadingUnseqFile(unseqResource);
     cost += calculateReadingSeqFiles(seqResources);
     cost += calculatingWritingTargetFiles(seqResources, unseqResource);
-    maxSeqChunkNumInDeviceList.clear();
     return cost;
   }
 
@@ -75,25 +69,28 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
    */
   private long calculateReadingUnseqFile(TsFileResource unseqResource) throws 
IOException {
     TsFileSequenceReader reader = getFileReader(unseqResource);
-    int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+    FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
     // it is max aligned series num of one device when tsfile contains aligned 
series,
     // else is sub compaction task num.
-    int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : 
fileInfo[2];
-    maxUnseqChunkNumInDevice = new Pair<>(fileInfo[3], fileInfo[0]);
-    // it means the max size of a timeseries in this file when reading all of 
its chunk into memory.
-    // Not only reading chunk into chunk cache, but also need to deserialize 
data point into merge
-    // reader, so we have to double the cost here.
-    if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this unSeq tsFile 
has no chunk.
+    int concurrentSeriesNum =
+        fileInfo.maxAlignedSeriesNumInDevice == -1
+            ? subCompactionTaskNum
+            : fileInfo.maxAlignedSeriesNumInDevice;
+    maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, 
concurrentSeriesNum);
+    if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this 
unSeq tsFile has no chunk.
       logger.warn(
           "calculateReadingUnseqFile(), find 1 empty unSeq tsFile: {}.",
           unseqResource.getTsFilePath());
       return 0;
     }
-    return 2 * concurrentSeriesNum * (unseqResource.getTsFileSize() * 
fileInfo[1] / fileInfo[0]);
+    // it means the max size of a timeseries in this file when reading all of 
its chunk into memory.
+    return compressionRatio
+        * concurrentSeriesNum
+        * (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / 
fileInfo.totalChunkNum);
   }
 
   /**
-   * Calculate memory cost of reading source seq files in the cross space 
compaction. Double the
+   * Calculate memory cost of reading source seq files in the cross space 
compaction. Select the
    * maximun size of the timeseries to be compacted at the same time in one 
seq file, because only
    * one seq file will be queried at the same time.
    */
@@ -101,38 +98,41 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
     long cost = 0;
     for (TsFileResource seqResource : seqResources) {
       TsFileSequenceReader reader = getFileReader(seqResource);
-      int[] fileInfo = getSeriesAndDeviceChunkNum(reader);
+      FileInfo fileInfo = getSeriesAndDeviceChunkNum(reader);
       // it is max aligned series num of one device when tsfile contains 
aligned series,
       // else is sub compaction task num.
-      int concurrentSeriesNum = fileInfo[2] == -1 ? subCompactionTaskNum : 
fileInfo[2];
-      long seqFileCost = 0;
-      if (fileInfo[0] == 0) { // If totalChunkNum ==0, i.e. this seq tsFile 
has no chunk.
+      int concurrentSeriesNum =
+          fileInfo.maxAlignedSeriesNumInDevice == -1
+              ? subCompactionTaskNum
+              : fileInfo.maxAlignedSeriesNumInDevice;
+      maxConcurrentSeriesNum = Math.max(maxConcurrentSeriesNum, 
concurrentSeriesNum);
+      long seqFileCost;
+      if (fileInfo.totalChunkNum == 0) { // If totalChunkNum ==0, i.e. this 
seq tsFile has no chunk.
         logger.warn(
             "calculateReadingSeqFiles(), find 1 empty seq tsFile: {}.",
             seqResource.getTsFilePath());
         seqFileCost = 0;
       } else {
-        seqFileCost =
-            concurrentSeriesNum * (seqResource.getTsFileSize() * fileInfo[1] / 
fileInfo[0]);
+        // We need to multiply the compression ratio here.
+        seqFileCost = compressionRatio * concurrentSeriesNum * 
config.getTargetChunkSize();
       }
 
       if (seqFileCost > maxCostOfReadingSeqFile) {
         // Only one seq file will be read at the same time.
         // not only reading chunk into chunk cache, but also need to 
deserialize data point into
-        // merge reader, so we have to double the cost here.
-        cost -= 2 * maxCostOfReadingSeqFile;
-        cost += 2 * seqFileCost;
+        // merge reader. We have to add the cost in merge reader here and the 
cost of chunk cache is
+        // unnecessary.
+        cost -= maxCostOfReadingSeqFile;
+        cost += seqFileCost;
         maxCostOfReadingSeqFile = seqFileCost;
       }
-      maxSeqChunkNumInDeviceList.add(new Pair<>(fileInfo[3], fileInfo[0]));
     }
     return cost;
   }
 
   /**
    * Calculate memory cost of writing target files in the cross space 
compaction. Including metadata
-   * size of all seq files, max chunk group size of each seq file and max 
chunk group size of
-   * corresponding overlapped unseq file.
+   * size of all source files and size of concurrent target chunks.
    */
   private long calculatingWritingTargetFiles(
       List<TsFileResource> seqResources, TsFileResource unseqResource) throws 
IOException {
@@ -141,17 +141,16 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
       TsFileSequenceReader reader = getFileReader(seqResource);
       // add seq file metadata size
       cost += reader.getFileMetadataSize();
-      // add max chunk group size of this seq tsfile
-      int totalSeqChunkNum = maxSeqChunkNumInDeviceList.get(0).right;
-      if (totalSeqChunkNum > 0) {
-        cost +=
-            seqResource.getTsFileSize() * 
maxSeqChunkNumInDeviceList.get(0).left / totalSeqChunkNum;
-      }
     }
-    // add max chunk group size of overlapped unseq tsfile
-    int totalUnSeqChunkNum = maxUnseqChunkNumInDevice.right;
-    if (totalUnSeqChunkNum > 0) {
-      cost += unseqResource.getTsFileSize() * maxUnseqChunkNumInDevice.left / 
totalUnSeqChunkNum;
+    // add unseq file metadata size
+    cost += getFileReader(unseqResource).getFileMetadataSize();
+
+    // concurrent series chunk size
+    long writingTargetCost = maxConcurrentSeriesNum * 
config.getTargetChunkSize();
+    if (writingTargetCost > maxCostOfWritingTargetFile) {
+      cost -= maxCostOfWritingTargetFile;
+      cost += writingTargetCost;
+      maxCostOfWritingTargetFile = writingTargetCost;
     }
 
     return cost;
@@ -169,7 +168,7 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
    *
    * <p>max chunk num of one device in this tsfile
    */
-  private int[] getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) throws 
IOException {
+  private FileInfo getSeriesAndDeviceChunkNum(TsFileSequenceReader reader) 
throws IOException {
     int totalChunkNum = 0;
     int maxChunkNum = 0;
     int maxAlignedSeriesNumInDevice = -1;
@@ -190,6 +189,29 @@ public class RewriteCrossCompactionEstimator extends 
AbstractCrossSpaceEstimator
       }
       maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum);
     }
-    return new int[] {totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, 
maxDeviceChunkNum};
+    return new FileInfo(totalChunkNum, maxChunkNum, 
maxAlignedSeriesNumInDevice, maxDeviceChunkNum);
+  }
+
+  private class FileInfo {
+    // total chunk num in this tsfile
+    public int totalChunkNum = 0;
+    // max chunk num of one timeseries in this tsfile
+    public int maxSeriesChunkNum = 0;
+    // max aligned series num in one device. If there is no aligned series in 
this file, then it
+    // turns to be -1.
+    public int maxAlignedSeriesNumInDevice = -1;
+    // max chunk num of one device in this tsfile
+    public int maxDeviceChunkNum = 0;
+
+    public FileInfo(
+        int totalChunkNum,
+        int maxSeriesChunkNum,
+        int maxAlignedSeriesNumInDevice,
+        int maxDeviceChunkNum) {
+      this.totalChunkNum = totalChunkNum;
+      this.maxSeriesChunkNum = maxSeriesChunkNum;
+      this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice;
+      this.maxDeviceChunkNum = maxDeviceChunkNum;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 71050ec1b7..e99a2c36ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -278,6 +278,9 @@ public class SystemInfo {
   }
 
   public void addCompactionMemoryCost(long memoryCost) throws 
InterruptedException {
+    if (!config.isEnableCompactionMemControl()) {
+      return;
+    }
     long originSize = this.compactionMemoryCost.get();
     while (originSize + memoryCost > memorySizeForCompaction
         || !compactionMemoryCost.compareAndSet(originSize, originSize + 
memoryCost)) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 2d20e3220e..0b50e9392b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -101,8 +101,8 @@ public class RewriteCompactionFileSelectorTest extends 
MergeTest {
     List[] result = mergeFileSelector.select();
     List<TsFileResource> seqSelected = result[0];
     List<TsFileResource> unseqSelected = result[1];
-    assertEquals(seqResources.subList(0, 3), seqSelected);
-    assertEquals(unseqResources.subList(0, 3), unseqSelected);
+    assertEquals(seqResources.subList(0, 1), seqSelected);
+    assertEquals(unseqResources.subList(0, 1), unseqSelected);
     resource.clear();
   }
 

Reply via email to