This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 52243ce  [IOTDB-1357] Compaction use append chunk merge strategy when 
chunk is already large (#3159) (#3188)
52243ce is described below

commit 52243ce47fa7c2806eb7e146cc4d91ee88354110
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Wed May 19 11:07:11 2021 +0800

    [IOTDB-1357] Compaction use append chunk merge strategy when chunk is 
already large (#3159) (#3188)
---
 .../engine/compaction/utils/CompactionUtils.java   | 81 +++++++++++++++++-----
 .../db/engine/compaction/CompactionChunkTest.java  |  4 +-
 .../compaction/LevelCompactionMergeTest.java       | 80 ++++++++++++++++++++-
 3 files changed, 144 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 9bc3ce2..a6aa68c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -67,14 +67,12 @@ import static 
org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
 public class CompactionUtils {
 
   private static final Logger logger = 
LoggerFactory.getLogger(CompactionUtils.class);
-  private static final int MERGE_PAGE_POINT_NUM =
-      
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
 
   private CompactionUtils() {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
+  private static Pair<ChunkMetadata, Chunk> readByAppendPageMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
@@ -96,7 +94,7 @@ public class CompactionUtils {
     return new Pair<>(newChunkMetadata, newChunk);
   }
 
-  private static void readByDeserializeMerge(
+  private static void readByDeserializePageMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
       Map<Long, TimeValuePair> timeValuePairMap,
       Map<String, List<Modification>> modificationCache,
@@ -122,14 +120,43 @@ public class CompactionUtils {
     }
   }
 
-  public static void writeByAppendMerge(
+  /**
+   * When chunk is large enough, we do not have to merge them any more. Just 
read chunks and write
+   * them to the new file directly.
+   */
+  public static void writeByAppendChunkMerge(
       String device,
       RateLimiter compactionWriteRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
       TsFileResource targetResource,
       RestorableTsFileIOWriter writer)
       throws IOException {
-    Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue());
+    Map<TsFileSequenceReader, List<ChunkMetadata>> readerListMap = 
entry.getValue();
+    for (Entry<TsFileSequenceReader, List<ChunkMetadata>> readerListEntry :
+        readerListMap.entrySet()) {
+      TsFileSequenceReader reader = readerListEntry.getKey();
+      List<ChunkMetadata> chunkMetadataList = readerListEntry.getValue();
+      // read chunk and write it to new file directly
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        Chunk chunk = reader.readMemChunk(chunkMetadata);
+        MergeManager.mergeRateLimiterAcquire(
+            compactionWriteRateLimiter,
+            (long) chunk.getHeader().getDataSize() + 
chunk.getData().position());
+        writer.writeChunk(chunk, chunkMetadata);
+        targetResource.updateStartTime(device, chunkMetadata.getStartTime());
+        targetResource.updateEndTime(device, chunkMetadata.getEndTime());
+      }
+    }
+  }
+
+  public static void writeByAppendPageMerge(
+      String device,
+      RateLimiter compactionWriteRateLimiter,
+      Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
+      TsFileResource targetResource,
+      RestorableTsFileIOWriter writer)
+      throws IOException {
+    Pair<ChunkMetadata, Chunk> chunkPair = 
readByAppendPageMerge(entry.getValue());
     ChunkMetadata newChunkMetadata = chunkPair.left;
     Chunk newChunk = chunkPair.right;
     if (newChunkMetadata != null && newChunk != null) {
@@ -143,7 +170,7 @@ public class CompactionUtils {
     }
   }
 
-  public static void writeByDeserializeMerge(
+  public static void writeByDeserializePageMerge(
       String device,
       RateLimiter compactionRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
@@ -154,7 +181,7 @@ public class CompactionUtils {
       throws IOException, IllegalPathException {
     Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
     Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap = 
entry.getValue();
-    readByDeserializeMerge(
+    readByDeserializePageMerge(
         readerChunkMetadataMap,
         timeValuePairMap,
         modificationCache,
@@ -317,7 +344,7 @@ public class CompactionUtils {
                 sensorReaderChunkMetadataListEntry =
                     new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
             if (!sequence) {
-              writeByDeserializeMerge(
+              writeByDeserializePageMerge(
                   device,
                   compactionWriteRateLimiter,
                   sensorReaderChunkMetadataListEntry,
@@ -326,28 +353,50 @@ public class CompactionUtils {
                   modificationCache,
                   modifications);
             } else {
+              boolean isChunkEnoughLarge = true;
               boolean isPageEnoughLarge = true;
               for (List<ChunkMetadata> chunkMetadatas : 
readerChunkMetadataListMap.values()) {
                 for (ChunkMetadata chunkMetadata : chunkMetadatas) {
-                  if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
+                  if (chunkMetadata.getNumOfPoints()
+                      < IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getMergePagePointNumberThreshold()) {
                     isPageEnoughLarge = false;
-                    break;
+                  }
+                  if (chunkMetadata.getNumOfPoints()
+                      < IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getMergeChunkPointNumberThreshold()) {
+                    isChunkEnoughLarge = false;
                   }
                 }
               }
-              if (isPageEnoughLarge) {
-                logger.debug("{} [Compaction] page enough large, use append 
merge", storageGroup);
+              // if a chunk is large enough, it's page must be large enough too
+              if (isChunkEnoughLarge) {
+                logger.debug(
+                    "{} [Compaction] chunk enough large, use append chunk 
merge", storageGroup);
+                // append page in chunks, so we do not have to deserialize a 
chunk
+                writeByAppendChunkMerge(
+                    device,
+                    compactionWriteRateLimiter,
+                    sensorReaderChunkMetadataListEntry,
+                    targetResource,
+                    writer);
+              } else if (isPageEnoughLarge) {
+                logger.debug(
+                    "{} [Compaction] page enough large, use append page 
merge", storageGroup);
                 // append page in chunks, so we do not have to deserialize a 
chunk
-                writeByAppendMerge(
+                writeByAppendPageMerge(
                     device,
                     compactionWriteRateLimiter,
                     sensorReaderChunkMetadataListEntry,
                     targetResource,
                     writer);
               } else {
-                logger.debug("{} [Compaction] page too small, use deserialize 
merge", storageGroup);
+                logger.debug(
+                    "{} [Compaction] page too small, use deserialize page 
merge", storageGroup);
                 // we have to deserialize chunks to merge pages
-                writeByDeserializeMerge(
+                writeByDeserializePageMerge(
                     device,
                     compactionWriteRateLimiter,
                     sensorReaderChunkMetadataListEntry,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
index 8a61859..5e7276a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
@@ -120,7 +120,7 @@ public class CompactionChunkTest extends 
LevelCompactionTest {
       }
       for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry 
:
           measurementChunkMetadataMap.entrySet()) {
-        CompactionUtils.writeByAppendMerge(
+        CompactionUtils.writeByAppendPageMerge(
             device, compactionWriteRateLimiter, entry, targetTsfileResource, 
writer);
       }
       reader.close();
@@ -199,7 +199,7 @@ public class CompactionChunkTest extends 
LevelCompactionTest {
       }
       for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry 
:
           measurementChunkMetadataMap.entrySet()) {
-        CompactionUtils.writeByDeserializeMerge(
+        CompactionUtils.writeByDeserializePageMerge(
             device,
             compactionWriteRateLimiter,
             entry,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 9b3984b..579fd88 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 
@@ -47,6 +49,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -210,9 +213,6 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
     long count = 0L;
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
-      for (int i = 0; i < batchData.length(); i++) {
-        System.out.println(batchData.getTimeByIndex(i));
-      }
       count += batchData.length();
     }
     assertEquals(489, count);
@@ -225,6 +225,80 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
     
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
   }
 
+  /** test append chunk merge, the chunk is already large than 
merge_chunk_point_number */
+  @Test
+  public void testCompactionAppendChunkMerge() throws IOException {
+    int prevMergeChunkPointNumberThreshold =
+        
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(1);
+
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionMergeTask =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionMergeTask.call();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    TsFileResource newTsFileResource =
+        levelCompactionTsFileManagement.getTsFileListByTimePartition(true, 
0).get(0);
+    TsFileSequenceReader tsFileSequenceReader =
+        new TsFileSequenceReader(newTsFileResource.getTsFilePath());
+    Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+        tsFileSequenceReader.readChunkMetadataInDevice(deviceIds[0]);
+    for (List<ChunkMetadata> chunkMetadataList : 
sensorChunkMetadataListMap.values()) {
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        assertEquals(20, chunkMetadata.getNumOfPoints());
+      }
+    }
+    tsFileSequenceReader.close();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setMergeChunkPointNumberThreshold(prevMergeChunkPointNumberThreshold);
+  }
+
+  /** test not append chunk merge, the chunk is smaller than 
merge_chunk_point_number */
+  @Test
+  public void testCompactionNoAppendChunkMerge() throws IOException {
+    int prevMergeChunkPointNumberThreshold =
+        
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(100000);
+
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionMergeTask =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionMergeTask.call();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    TsFileResource newTsFileResource =
+        levelCompactionTsFileManagement.getTsFileListByTimePartition(true, 
0).get(0);
+    TsFileSequenceReader tsFileSequenceReader =
+        new TsFileSequenceReader(newTsFileResource.getTsFilePath());
+    Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+        tsFileSequenceReader.readChunkMetadataInDevice(deviceIds[0]);
+    for (List<ChunkMetadata> chunkMetadataList : 
sensorChunkMetadataListMap.values()) {
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        assertEquals(500, chunkMetadata.getNumOfPoints());
+      }
+    }
+    tsFileSequenceReader.close();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setMergeChunkPointNumberThreshold(prevMergeChunkPointNumberThreshold);
+  }
+
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(
       boolean isMergeExecutedInCurrentTask, long timePartitionId) {

Reply via email to