This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5980e44e08057c7c797c9a02f3a96f4a5b16dd18
Author: CRZbulabula <[email protected]>
AuthorDate: Tue Sep 29 23:16:15 2020 +0800

    change merge strategy
    
    Use the approximate average value to determine whether to merge
---
 .../level/LevelTsFileManagement.java               | 149 ++++++++++++---------
 1 file changed, 86 insertions(+), 63 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 2ad1de0..2b852d2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -40,7 +40,11 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -69,10 +73,14 @@ public class LevelTsFileManagement extends TsFileManagement 
{
   private final Map<Long, List<List<TsFileResource>>> 
unSequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
-  private long forkedSeqListPointNum = 0;
-  private Set<String> forkedSeqListDeviceSet = new HashSet<>();
-  private long forkedUnSeqListPointNum = 0;
-  private Set<String> forkedUnSeqListDeviceSet = new HashSet<>();
+
+  // Deciding whether or not to merge;
+  //private boolean forkedSeqListMergeFlag = false;
+  //private boolean forkedUnSeqListMergeFlag = false;
+  private double forkedSeqListPointNum = 0;
+  private double forkedSeqListDeviceSize = 0;
+  private double forkedUnSeqListPointNum = 0;
+  private double forkedUnSeqListDeviceSize = 0;
 
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -111,7 +119,7 @@ public class LevelTsFileManagement extends TsFileManagement 
{
     File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), 
maxLevelNum - 1);
     TsFileResource targetResource = new TsFileResource(newTargetFile);
     List<TsFileResource> mergeFiles = new ArrayList<>();
-    for (int i = currMergeFiles.size(); i >= 0; i--) {
+    for (int i = currMergeFiles.size() - 1; i >= 0; i--) {
       mergeFiles.addAll(sequenceTsFileResources.get(timePartitionId).get(i));
     }
     HotCompactionUtils.merge(targetResource, mergeFiles,
@@ -403,17 +411,68 @@ public class LevelTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void forkCurrentFileList(long timePartition) {
-    Pair<Long, Set<String>> seqResult = forkTsFileList(
+    Pair<Double, Double> seqResult = forkTsFileList(
         forkedSequenceTsFileResources,
         sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources));
     forkedSeqListPointNum = seqResult.left;
-    forkedSeqListDeviceSet = seqResult.right;
-    Pair<Long, Set<String>> unSeqResult = forkTsFileList(
+    forkedSeqListDeviceSize = seqResult.right;
+    Pair<Double, Double> unSeqResult = forkTsFileList(
         forkedUnSequenceTsFileResources,
-        unSequenceTsFileResources
-            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
+        unSequenceTsFileResources.computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
     forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListDeviceSet = unSeqResult.right;
+    forkedUnSeqListDeviceSize = unSeqResult.right;
+  }
+
+  private Pair<Double, Double> forkTsFileList(
+      List<List<TsFileResource>> forkedTsFileResources,
+      List rawTsFileResources) {
+    forkedTsFileResources.clear();
+    // just fork part of the TsFile list, controlled by max_merge_chunk_point
+    long pointNum = 0;
+    // all flush to target file
+//  Set<String> deviceSet = new HashSet<>();
+    ICardinality deviceSet = new HyperLogLog(13);
+    for (int i = 0; i < maxLevelNum - 1; i++) {
+      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
+          .get(i);
+      synchronized (levelRawTsFileResources) {
+        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+          if (tsFileResource.isClosed()) {
+            Map<String, Long> chunkPointMap = 
FileChunkPointSizeCache.getInstance()
+                .get(tsFileResource.getTsFile());
+            for (Entry<String, Long> deviceChunkPoint : 
chunkPointMap.entrySet()) {
+              deviceSet.offer(deviceChunkPoint.getKey());
+              pointNum += deviceChunkPoint.getValue();
+            }
+          }
+          if (deviceSet.cardinality() > 0
+              && pointNum / deviceSet.cardinality() >= maxChunkPointNum) {
+            forkedLevelTsFileResources.add(tsFileResource);
+            break;
+          }
+          forkedLevelTsFileResources.add(tsFileResource);
+        }
+      }
+
+      if (deviceSet.cardinality() > 0
+          && pointNum / deviceSet.cardinality() >= maxChunkPointNum) {
+        forkedTsFileResources.add(forkedLevelTsFileResources);
+        break;
+      }
+      forkedTsFileResources.add(forkedLevelTsFileResources);
+      //System.out.println(forkedLevelTsFileResources.size());
+      //System.out.println(forkedTsFileResources.get(i).size());
+    }
+
+    // fill in empty file
+    while (forkedTsFileResources.size() < maxLevelNum) {
+      List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
+      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
+    }
+
+    //return forkedTsFileResources.size() > 1;
+    return new Pair<>((double)pointNum, (double)deviceSet.cardinality());
   }
 
 //  private Pair<Long, Set<String>> forkTsFileList(
@@ -431,52 +490,15 @@ public class LevelTsFileManagement extends 
TsFileManagement {
 //      synchronized (levelRawTsFileResources) {
 //        for (TsFileResource tsFileResource : levelRawTsFileResources) {
 //          if (tsFileResource.isClosed()) {
-//            Map<String, Long> chunkPointMap = 
FileChunkPointSizeCache.getInstance()
-//                .get(tsFileResource.getTsFile());
-//            for (Entry<String, Long> deviceChunkPoint : 
chunkPointMap.entrySet()) {
-//              deviceSet.add(deviceChunkPoint.getKey());
-//              pointNum += deviceChunkPoint.getValue();
-//            }
-//          }
-//          if (deviceSet.size() > 0
-//              && pointNum / deviceSet.size() >= maxChunkPointNum) {
-//            break;
+//            forkedLevelTsFileResources.add(tsFileResource);
 //          }
 //        }
 //      }
-//      if (deviceSet.size() > 0
-//          && pointNum / deviceSet.size() >= maxChunkPointNum) {
-//        break;
-//      }
 //      forkedTsFileResources.add(forkedLevelTsFileResources);
 //    }
 //    return new Pair<>(pointNum, deviceSet);
 //  }
 
-  private Pair<Long, Set<String>> forkTsFileList(
-      List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources) {
-    forkedTsFileResources.clear();
-    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-    long pointNum = 0;
-    // all flush to target file
-    Set<String> deviceSet = new HashSet<>();
-    for (int i = 0; i < maxLevelNum - 1; i++) {
-      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
-      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
-          .get(i);
-      synchronized (levelRawTsFileResources) {
-        for (TsFileResource tsFileResource : levelRawTsFileResources) {
-          if (tsFileResource.isClosed()) {
-            forkedLevelTsFileResources.add(tsFileResource);
-          }
-        }
-      }
-      forkedTsFileResources.add(forkedLevelTsFileResources);
-    }
-    return new Pair<>(pointNum, deviceSet);
-  }
-
   @Override
   protected void merge(long timePartition) {
     merge(forkedSequenceTsFileResources, true, timePartition);
@@ -489,21 +511,22 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
-      long pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      Set<String> deviceSet =
-          sequence ? forkedSeqListDeviceSet : forkedUnSeqListDeviceSet;
-//      logger
-//          .info("{} current sg subLevel point num: {}, device num: {}", 
storageGroupName, pointNum,
-//              deviceSet.size());
+      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
+      double deviceSize =
+          sequence ? forkedSeqListDeviceSize : forkedSeqListDeviceSize;
+      //boolean mergeFlag = sequence ? forkedSeqListMergeFlag : 
forkedUnSeqListMergeFlag;
+
+      logger
+          .info("{} current sg subLevel point num: {}, approximate device num: 
{}", storageGroupName, pointNum,
+                  deviceSize);
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-//      if (deviceSet.size() > 0 && pointNum / deviceSet.size() > 
IoTDBDescriptor.getInstance()
-//          .getConfig().getMergeChunkPointNumberThreshold()) {
-//        // merge all tsfile to last level
-//        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
-//            mergeResources.size());
-//        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
-//      } else {
+      if (deviceSize > 0 && pointNum / deviceSize >= maxChunkPointNum) {
+        // merge all tsfile to last level
+        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
+            mergeResources.size());
+        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
+      } else {
       for (int i = 0; i < maxLevelNum - 1; i++) {
         if (maxFileNumInEachLevel <= mergeResources.get(i).size()) {
           for (TsFileResource mergeResource : mergeResources.get(i)) {
@@ -537,7 +560,7 @@ public class LevelTsFileManagement extends TsFileManagement 
{
           }
         }
       }
-//      }
+      }
       hotCompactionLogger.close();
       File logFile = FSFactoryProducer.getFSFactory()
           .getFile(storageGroupDir, storageGroupName + 
HOT_COMPACTION_LOG_NAME);

Reply via email to