This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dynamic_compaction by this 
push:
     new 443d080  fix hitter
443d080 is described below

commit 443d0808f9e38961221a59c9e32d4326614f0b3b
Author: EJTTianyu <[email protected]>
AuthorDate: Sun May 23 00:10:33 2021 +0800

    fix hitter
---
 .../HitterLevelCompactionTsFileManagement.java       | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 7b004a7..30cdeef 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -56,6 +56,9 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
   private final int sizeRatio = 
IoTDBDescriptor.getInstance().getConfig().getSizeRatio();
   private final int firstLevelNum = Math
       
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
+  private final int fullMergeRate =
+      
IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec() -
+          
IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec();
   private final String MERGE_SUFFIX = ".temp";
   private boolean isFullMerging = false;
 
@@ -68,6 +71,7 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
       this.mergeFileLst = mergeFileLst;
       this.timePartitionId = timePartitionId;
     }
+
     @Override
     public void run() {
       mergeFull(mergeFileLst, timePartitionId);
@@ -227,9 +231,11 @@ public class HitterLevelCompactionTsFileManagement extends 
LevelCompactionTsFile
           }
         }
       }
-      List<TsFileResource> fullMergeRes = new 
ArrayList<>(mergeResources.get(seqLevelNum - 1));
-      FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes, 
timePartition);
-      new Thread(fullMergeTask).start();
+      if (fullMergeRate > 0) {
+        List<TsFileResource> fullMergeRes = new 
ArrayList<>(mergeResources.get(seqLevelNum - 1));
+        FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes, 
timePartition);
+        new Thread(fullMergeTask).start();
+      }
     } catch (Exception e) {
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
@@ -415,10 +421,11 @@ public class HitterLevelCompactionTsFileManagement 
extends LevelCompactionTsFile
     Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
         .get(currMaxLevel - 1);
     List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
-    for (TsFileResource tsFileResource: levelRawTsFileResources) {
+    for (TsFileResource tsFileResource : levelRawTsFileResources) {
       if (tsFileResource.isClosed()) {
         forkedLevelTsFileResources.add(tsFileResource);
-        if (forkedLevelTsFileResources.size() >= firstLevelNum * 
Math.pow(sizeRatio, currMaxLevel - 2)) {
+        if (forkedLevelTsFileResources.size() >= firstLevelNum * Math
+            .pow(sizeRatio, currMaxLevel - 2)) {
           break;
         }
       }
@@ -469,10 +476,11 @@ public class HitterLevelCompactionTsFileManagement 
extends LevelCompactionTsFile
           writeUnlock();
         }
         deleteLevelFilesInDisk(mergeFileLst);
-        isFullMerging = false;
       }
     } catch (Exception e) {
       logger.error("Error occurred in Compaction Merge thread", e);
+    } finally {
+      isFullMerging = false;
     }
   }
 }

Reply via email to