This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dynamic_compaction by this
push:
new 443d080 fix hitter
443d080 is described below
commit 443d0808f9e38961221a59c9e32d4326614f0b3b
Author: EJTTianyu <[email protected]>
AuthorDate: Sun May 23 00:10:33 2021 +0800
fix hitter
---
.../HitterLevelCompactionTsFileManagement.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 7b004a7..30cdeef 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -56,6 +56,9 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
private final int sizeRatio =
IoTDBDescriptor.getInstance().getConfig().getSizeRatio();
private final int firstLevelNum = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
+ private final int fullMergeRate =
+
IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec() -
+
IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec();
private final String MERGE_SUFFIX = ".temp";
private boolean isFullMerging = false;
@@ -68,6 +71,7 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
this.mergeFileLst = mergeFileLst;
this.timePartitionId = timePartitionId;
}
+
@Override
public void run() {
mergeFull(mergeFileLst, timePartitionId);
@@ -227,9 +231,11 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
}
}
}
- List<TsFileResource> fullMergeRes = new
ArrayList<>(mergeResources.get(seqLevelNum - 1));
- FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes,
timePartition);
- new Thread(fullMergeTask).start();
+ if (fullMergeRate > 0) {
+ List<TsFileResource> fullMergeRes = new
ArrayList<>(mergeResources.get(seqLevelNum - 1));
+ FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes,
timePartition);
+ new Thread(fullMergeTask).start();
+ }
} catch (Exception e) {
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
@@ -415,10 +421,11 @@ public class HitterLevelCompactionTsFileManagement
extends LevelCompactionTsFile
Collection<TsFileResource> levelRawTsFileResources =
(Collection<TsFileResource>) rawTsFileResources
.get(currMaxLevel - 1);
List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
- for (TsFileResource tsFileResource: levelRawTsFileResources) {
+ for (TsFileResource tsFileResource : levelRawTsFileResources) {
if (tsFileResource.isClosed()) {
forkedLevelTsFileResources.add(tsFileResource);
- if (forkedLevelTsFileResources.size() >= firstLevelNum *
Math.pow(sizeRatio, currMaxLevel - 2)) {
+ if (forkedLevelTsFileResources.size() >= firstLevelNum * Math
+ .pow(sizeRatio, currMaxLevel - 2)) {
break;
}
}
@@ -469,10 +476,11 @@ public class HitterLevelCompactionTsFileManagement
extends LevelCompactionTsFile
writeUnlock();
}
deleteLevelFilesInDisk(mergeFileLst);
- isFullMerging = false;
}
} catch (Exception e) {
logger.error("Error occurred in Compaction Merge thread", e);
+ } finally {
+ isFullMerging = false;
}
}
}