This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c61fd434f0be58fa1b291fca9f9e8caebc5ff725
Author: 张凌哲 <[email protected]>
AuthorDate: Thu Oct 8 13:07:44 2020 +0800

    add merge page point number
---
 .../resources/conf/iotdb-engine.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  27 +++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../tsfilemanagement/utils/HotCompactionUtils.java | 116 ++++++++++++++-------
 4 files changed, 105 insertions(+), 46 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 97ded95..5fb3777 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -279,6 +279,10 @@ max_unseq_level_num=2
 # When merge point number reaches this, merge the files to the last level.
 merge_chunk_point_number=100000
 
+# Work when tsfile_manage_strategy is level_strategy.
+# When page point number of file reaches this, use append merge instead of 
deserialize merge.
+merge_page_point_number=1000
+
 # How many thread will be set up to perform merge main tasks, 1 by default.
 # Set to 1 when less than or equal to 0.
 merge_thread_num=1
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5322c07..d53892f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -261,13 +261,19 @@ public class IoTDBConfig {
   private int mergeChunkPointNumberThreshold = 100000;
 
   /**
+   * Work when tsfile_manage_strategy is level_strategy. When page point 
number of file reaches
+   * this, use append merge instead of deserialize merge.
+   */
+  private int mergePagePointNumberThreshold = 1000;
+
+  /**
    * TsFile manage strategy, define use which hot compaction strategy
    */
   private TsFileManagementStrategy tsFileManagementStrategy = 
TsFileManagementStrategy.NORMAL_STRATEGY;
 
   /**
-   * Work when tsfile_manage_strategy is level_strategy. The max seq file num 
of each level. When file
-   * num exceeds this, the files in one level will merge to one.
+   * Work when tsfile_manage_strategy is level_strategy. The max seq file num 
of each level. When
+   * file num exceeds this, the files in one level will merge to one.
    */
   private int maxFileNumInEachLevel = 10;
 
@@ -277,8 +283,8 @@ public class IoTDBConfig {
   private int maxLevelNum = 4;
 
   /**
-   * Work when tsfile_manage_strategy is level_strategy. The max unseq file 
num of each level. When file
-   * num exceeds this, the files in one level will merge to one.
+   * Work when tsfile_manage_strategy is level_strategy. The max unseq file 
num of each level. When
+   * file num exceeds this, the files in one level will merge to one.
    */
   private int maxUnseqFileNumInEachLevel = 10;
 
@@ -617,8 +623,9 @@ public class IoTDBConfig {
   private int defaultFillInterval = -1;
 
   /**
-   * default TTL for storage groups that are not set TTL by statements, in ms
-   * Notice: if this property is changed, previous created storage group which 
are not set TTL will also be affected.
+   * default TTL for storage groups that are not set TTL by statements, in ms 
Notice: if this
+   * property is changed, previous created storage group which are not set TTL 
will also be
+   * affected.
    */
   private long defaultTTL = Long.MAX_VALUE;
 
@@ -1284,6 +1291,14 @@ public class IoTDBConfig {
     this.mergeChunkPointNumberThreshold = mergeChunkPointNumberThreshold;
   }
 
+  public int getMergePagePointNumberThreshold() {
+    return mergePagePointNumberThreshold;
+  }
+
+  public void setMergePagePointNumberThreshold(int 
mergePagePointNumberThreshold) {
+    this.mergePagePointNumberThreshold = mergePagePointNumberThreshold;
+  }
+
   public MergeFileStrategy getMergeFileStrategy() {
     return mergeFileStrategy;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b77f4d0..0d35a0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -275,6 +275,10 @@ public class IoTDBDescriptor {
           .getProperty("merge_chunk_point_number",
               Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
 
+      conf.setMergePagePointNumberThreshold(Integer.parseInt(properties
+          .getProperty("merge_page_point_number",
+              Integer.toString(conf.getMergePagePointNumberThreshold()))));
+
       
conf.setTsFileManagementStrategy(TsFileManagementStrategy.valueOf(properties
           .getProperty("tsfile_manage_strategy",
               conf.getTsFileManagementStrategy().toString())));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
index 20aa0d2..35e5bd4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
@@ -24,7 +24,6 @@ import static 
org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
 import com.google.common.util.concurrent.RateLimiter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -34,10 +33,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -56,12 +55,14 @@ import org.slf4j.LoggerFactory;
 public class HotCompactionUtils {
 
   private static final Logger logger = 
LoggerFactory.getLogger(HotCompactionUtils.class);
+  private static final int mergePagePointNum = 
IoTDBDescriptor.getInstance().getConfig()
+      .getMergePagePointNumberThreshold();
 
   private HotCompactionUtils() {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> readSeqChunk(
+  private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
@@ -81,7 +82,7 @@ public class HotCompactionUtils {
     return new Pair<>(newChunkMetadata, newChunk);
   }
 
-  private static long readUnseqChunk(
+  private static long readByDeserializeMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, 
long maxVersion,
       Map<Long, TimeValuePair> timeValuePairMap)
       throws IOException {
@@ -106,6 +107,54 @@ public class HotCompactionUtils {
     return maxVersion;
   }
 
+  private static long writeByAppendMerge(long maxVersion, String device,
+      RateLimiter compactionRateLimiter,
+      Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap,
+      TsFileResource targetResource, RestorableTsFileIOWriter writer) throws 
IOException {
+    Pair<ChunkMetadata, Chunk> chunkPair = 
readByAppendMerge(readerChunkMetadatasMap);
+    ChunkMetadata newChunkMetadata = chunkPair.left;
+    Chunk newChunk = chunkPair.right;
+    if (newChunkMetadata != null && newChunk != null) {
+      maxVersion = Math.max(newChunkMetadata.getVersion(), maxVersion);
+      // wait for limit write
+      MergeManager.mergeRateLimiterAcquire(compactionRateLimiter,
+          newChunk.getHeader().getDataSize() + newChunk.getData().position());
+      writer.writeChunk(newChunk, newChunkMetadata);
+      targetResource.updateStartTime(device, newChunkMetadata.getStartTime());
+      targetResource.updateEndTime(device, newChunkMetadata.getEndTime());
+    }
+    return maxVersion;
+  }
+
+  private static long writeByDeserializeMerge(long maxVersion, String device,
+      RateLimiter compactionRateLimiter,
+      Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
+      TsFileResource targetResource, RestorableTsFileIOWriter writer) throws 
IOException {
+    Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
+    maxVersion = readByDeserializeMerge(entry.getValue(), maxVersion, 
timeValuePairMap);
+    Iterator<List<ChunkMetadata>> chunkMetadataListIterator = 
entry.getValue().values()
+        .iterator();
+    if (!chunkMetadataListIterator.hasNext()) {
+      return maxVersion;
+    }
+    List<ChunkMetadata> chunkMetadataList = chunkMetadataListIterator.next();
+    if (chunkMetadataList.size() <= 0) {
+      return maxVersion;
+    }
+    IChunkWriter chunkWriter = new ChunkWriterImpl(
+        new MeasurementSchema(entry.getKey(), 
chunkMetadataList.get(0).getDataType()));
+    for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
+      writeTVPair(timeValuePair, chunkWriter);
+      targetResource.updateStartTime(device, timeValuePair.getTimestamp());
+      targetResource.updateEndTime(device, timeValuePair.getTimestamp());
+    }
+    // wait for limit write
+    MergeManager
+        .mergeRateLimiterAcquire(compactionRateLimiter, 
chunkWriter.getCurrentChunkSize());
+    chunkWriter.writeToFileWriter(writer);
+    return maxVersion;
+  }
+
   private static Set<String> getTsFileDevicesSet(List<TsFileResource> 
subLevelResources,
       Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String 
storageGroup)
       throws IOException {
@@ -177,51 +226,38 @@ public class HotCompactionUtils {
         long maxVersion = Long.MIN_VALUE;
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
-          maxVersion = readUnseqChunk(entry.getValue(), maxVersion, 
timeValuePairMap);
-          Iterator<List<ChunkMetadata>> chunkMetadataListIterator = 
entry.getValue().values()
-              .iterator();
-          if (!chunkMetadataListIterator.hasNext()) {
-            continue;
-          }
-          List<ChunkMetadata> chunkMetadataList = 
chunkMetadataListIterator.next();
-          if (chunkMetadataList.size() <= 0) {
-            continue;
-          }
-          IChunkWriter chunkWriter = new ChunkWriterImpl(
-              new MeasurementSchema(entry.getKey(), 
chunkMetadataList.get(0).getDataType()));
-          for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
-            writeTVPair(timeValuePair, chunkWriter);
-            targetResource.updateStartTime(device, 
timeValuePair.getTimestamp());
-            targetResource.updateEndTime(device, timeValuePair.getTimestamp());
-          }
-          // wait for limit write
-          MergeManager
-              .mergeRateLimiterAcquire(compactionRateLimiter, 
chunkWriter.getCurrentChunkSize());
-          chunkWriter.writeToFileWriter(writer);
-          if (hotCompactionLogger != null) {
-            hotCompactionLogger.logDevice(device, writer.getPos());
-          }
+          maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionRateLimiter, entry,
+              targetResource, writer);
         }
         writer.endChunkGroup();
         writer.writeVersion(maxVersion);
+        if (hotCompactionLogger != null) {
+          hotCompactionLogger.logDevice(device, writer.getPos());
+        }
       } else {
+        long maxVersion = Long.MIN_VALUE;
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(
-              entry.getValue());
-          ChunkMetadata newChunkMetadata = chunkPair.left;
-          Chunk newChunk = chunkPair.right;
-          if (newChunkMetadata != null && newChunk != null) {
-            // wait for limit write
-            MergeManager.mergeRateLimiterAcquire(compactionRateLimiter,
-                (long)newChunk.getHeader().getDataSize() + 
newChunk.getData().position());
-            writer.writeChunk(newChunk, newChunkMetadata);
-            targetResource.updateStartTime(device, 
newChunkMetadata.getStartTime());
-            targetResource.updateEndTime(device, 
newChunkMetadata.getEndTime());
+          Map<TsFileSequenceReader, List<ChunkMetadata>> 
readerChunkMetadatasMap = entry.getValue();
+          boolean isPageEnoughLarge = true;
+          for (List<ChunkMetadata> chunkMetadatas : 
readerChunkMetadatasMap.values()) {
+            for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+              if (chunkMetadata.getNumOfPoints() < mergePagePointNum) {
+                isPageEnoughLarge = false;
+                break;
+              }
+            }
+          }
+          if (isPageEnoughLarge) {
+            maxVersion = writeByAppendMerge(maxVersion, device, 
compactionRateLimiter,
+                readerChunkMetadatasMap, targetResource, writer);
+          } else {
+            maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionRateLimiter, entry,
+                targetResource, writer);
           }
         }
         writer.endChunkGroup();
+        writer.writeVersion(maxVersion);
         if (hotCompactionLogger != null) {
           hotCompactionLogger.logDevice(device, writer.getPos());
         }

Reply via email to