This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dynamic_compaction by this 
push:
     new a3c15e0  complete change
a3c15e0 is described below

commit a3c15e080f5d11ef76576e3c222b2fb218135385
Author: EJTTianyu <[email protected]>
AuthorDate: Sat May 15 23:59:28 2021 +0800

    complete change
---
 .../resources/conf/iotdb-engine.properties         |  6 ++---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  7 +++++-
 .../level/LevelCompactionTsFileManagement.java     |  9 ++++++++
 .../HitterLevelCompactionTsFileManagement.java     | 13 +++++++++--
 .../engine/heavyhitter/hitter/HashMapHitter.java   | 27 +++++++++++++++++++++-
 .../engine/storagegroup/StorageGroupProcessor.java |  4 ++--
 7 files changed, 58 insertions(+), 10 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index b9a1fc1..52b592b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -368,17 +368,17 @@ force_full_merge=false
 compaction_thread_num=10
 
 # The limit of write throughput merge can reach per second
-merge_write_throughput_mb_per_sec=64
+merge_write_throughput_mb_per_sec=1
 
 ####################
 ### Hitter Merge Configurations
 ####################
 
 # query hitter strategy
-query_hitter_strategy=DEFAULT_STRATEGY
+query_hitter_strategy=HASH_STRATEGY
 
 # max query paths hitter contains
-max_hitter_num=5000
+max_hitter_num=500
 
 # size ratio of the hitter level merge
 size_ratio=2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e49561c..8af6069 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -710,7 +710,7 @@ public class IoTDBConfig {
   /**
    * whether enable data partition. If disabled, all data belongs to partition 0
    */
-  private boolean enablePartition = true;
+  private boolean enablePartition = false;
 
   /**
    * whether enable MTree snapshot
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 8bd7155..1cccd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -604,8 +604,13 @@ public class StorageEngine implements IService {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       throw new StorageEngineException("Current system mode is read only, does 
not support merge");
     }
+    try {
+      StorageEngine.getInstance().getProcessor(new 
PartialPath("root.group_0.d_0")).testMerge();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
     for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
-      storageGroupProcessor.merge(fullMerge);
+//      storageGroupProcessor.merge(fullMerge);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 936e940..9454a14 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,6 +64,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   protected final int seqLevelNum = Math
       .max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
+  private static int merge_time = 0;
   protected final int seqFileNumInEachLevel = Math
       
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
   protected final int unseqLevelNum = Math
@@ -473,8 +474,12 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
+    long startTimeMillis = System.currentTimeMillis();
     merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum,
         seqFileNumInEachLevel);
+    long time = System.currentTimeMillis();
+    merge_time += time - startTimeMillis;
+    logger.info("merge 总时长: {}", merge_time);
     if (enableUnseqCompaction && unseqLevelNum <= 1 && 
forkedUnSequenceTsFileResources.size() > 0) {
       merge(isForceFullMerge, getTsFileList(true), 
forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
@@ -482,6 +487,10 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       merge(forkedUnSequenceTsFileResources, false, timePartition, 
unseqLevelNum,
           unseqFileNumInEachLevel);
     }
+    this.forkCurrentFileList(timePartition);
+    if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
+      this.merge(timePartition);
+    }
   }
 
   @SuppressWarnings("squid:S3776")
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 7787c77..4653dec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -51,6 +51,7 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
 
   private static final Logger logger = LoggerFactory
       .getLogger(HitterLevelCompactionTsFileManagement.class);
+  private static int merge_time = 0;
   private final int sizeRatio = 
IoTDBDescriptor.getInstance().getConfig().getSizeRatio();
   private final int firstLevelNum = Math
       
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
@@ -62,11 +63,19 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
 
   @Override
   protected void merge(long timePartition) {
-    merge(forkedSequenceTsFileResources, timePartition);
+    long startTimeMillis = System.currentTimeMillis();
+    merge1(forkedSequenceTsFileResources, timePartition);
+    long time = System.currentTimeMillis();
+    merge_time += time - startTimeMillis;
+    logger.info("merge 总时长: {}", merge_time);
     if (enableUnseqCompaction && forkedUnSequenceTsFileResources.size() > 0) {
       merge(isForceFullMerge, getTsFileList(true), 
forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     }
+    this.forkCurrentFileList(timePartition);
+    if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
+      merge(timePartition);
+    }
   }
 
   protected void merge1(List<List<TsFileResource>> mergeResources, long 
timePartition) {
@@ -373,7 +382,7 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
       for (TsFileResource tsFileResource : levelRawTsFileResources) {
         if (tsFileResource.isClosed()) {
           forkedLevelTsFileResources.add(tsFileResource);
-          if (forkedLevelTsFileResources.size() > firstLevelNum * 
Math.pow(sizeRatio, i)) {
+          if (forkedLevelTsFileResources.size() >= firstLevelNum * 
Math.pow(sizeRatio, i)) {
             break;
           }
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index 5a91728..cfeeae8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -24,11 +24,17 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +42,15 @@ import org.slf4j.LoggerFactory;
 public class HashMapHitter implements QueryHeavyHitters {
 
   private static final Logger logger = 
LoggerFactory.getLogger(HashMapHitter.class);
+  int hitter = IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum();
   private Map<PartialPath, Integer> counter = new HashMap<>();
+  private PriorityQueue<Entry<PartialPath, Integer>> topHeap = new 
PriorityQueue<>(hitter,
+      new Comparator<Entry<PartialPath, Integer>>() {
+        @Override
+        public int compare(Entry<PartialPath, Integer> o1, Entry<PartialPath, 
Integer> o2) {
+          return o2.getValue() - o1.getValue();
+        }
+      });
 
   public HashMapHitter(int maxHitterNum) {
 
@@ -49,7 +63,18 @@ public class HashMapHitter implements QueryHeavyHitters {
 
   @Override
   public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws 
MetadataException {
-    return null;
+    List<PartialPath> ret = new ArrayList<>();
+    topHeap.addAll(counter.entrySet());
+    List<PartialPath> sgPaths = 
MManager.getInstance().getAllTimeseriesPath(sgName);
+    for (int k = 0; k < hitter; k++) {
+      if (!topHeap.isEmpty()) {
+        PartialPath path =  topHeap.poll().getKey();
+        if (sgPaths.contains(path)) {
+          ret.add(path);
+        }
+      }
+    }
+    return ret;
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1810179..1c1a055 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1746,11 +1746,11 @@ public class StorageGroupProcessor {
 
   public void testMerge() throws Exception {
     // fork and filter current tsfile, then commit then to compaction merge
-    tsFileManagement.forkCurrentFileList(213524);
+    tsFileManagement.forkCurrentFileList(0);
     CompactionMergeTaskPoolManager.getInstance()
         .submitTask(
             tsFileManagement.new 
CompactionMergeTask(this::closeCompactionMergeCallBack,
-                213524));
+                0));
   }
 
 

Reply via email to