This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dynamic_compaction by this
push:
new a3c15e0 complete change
a3c15e0 is described below
commit a3c15e080f5d11ef76576e3c222b2fb218135385
Author: EJTTianyu <[email protected]>
AuthorDate: Sat May 15 23:59:28 2021 +0800
complete change
---
.../resources/conf/iotdb-engine.properties | 6 ++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 7 +++++-
.../level/LevelCompactionTsFileManagement.java | 9 ++++++++
.../HitterLevelCompactionTsFileManagement.java | 13 +++++++++--
.../engine/heavyhitter/hitter/HashMapHitter.java | 27 +++++++++++++++++++++-
.../engine/storagegroup/StorageGroupProcessor.java | 4 ++--
7 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index b9a1fc1..52b592b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -368,17 +368,17 @@ force_full_merge=false
compaction_thread_num=10
# The limit of write throughput merge can reach per second
-merge_write_throughput_mb_per_sec=64
+merge_write_throughput_mb_per_sec=1
####################
### Hitter Merge Configurations
####################
# query hitter strategy
-query_hitter_strategy=DEFAULT_STRATEGY
+query_hitter_strategy=HASH_STRATEGY
# max query paths hitter contains
-max_hitter_num=5000
+max_hitter_num=500
# size ratio of the hitter level merge
size_ratio=2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e49561c..8af6069 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -710,7 +710,7 @@ public class IoTDBConfig {
/**
* whether enable data partition. If disabled, all data belongs to partition 0
*/
- private boolean enablePartition = true;
+ private boolean enablePartition = false;
/**
* whether enable MTree snapshot
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 8bd7155..1cccd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -604,8 +604,13 @@ public class StorageEngine implements IService {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineException("Current system mode is read only, does
not support merge");
}
+ try {
+ StorageEngine.getInstance().getProcessor(new
PartialPath("root.group_0.d_0")).testMerge();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- storageGroupProcessor.merge(fullMerge);
+// storageGroupProcessor.merge(fullMerge);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 936e940..9454a14 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,6 +64,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
protected final int seqLevelNum = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
+ private static int merge_time = 0;
protected final int seqFileNumInEachLevel = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
protected final int unseqLevelNum = Math
@@ -473,8 +474,12 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
@Override
protected void merge(long timePartition) {
+ long startTimeMillis = System.currentTimeMillis();
merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum,
seqFileNumInEachLevel);
+ long time = System.currentTimeMillis();
+ merge_time += time - startTimeMillis;
+ logger.info("merge 总时长: {}", merge_time);
if (enableUnseqCompaction && unseqLevelNum <= 1 &&
forkedUnSequenceTsFileResources.size() > 0) {
merge(isForceFullMerge, getTsFileList(true),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
@@ -482,6 +487,10 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
merge(forkedUnSequenceTsFileResources, false, timePartition,
unseqLevelNum,
unseqFileNumInEachLevel);
}
+ this.forkCurrentFileList(timePartition);
+ if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
+ this.merge(timePartition);
+ }
}
@SuppressWarnings("squid:S3776")
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 7787c77..4653dec 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -51,6 +51,7 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
private static final Logger logger = LoggerFactory
.getLogger(HitterLevelCompactionTsFileManagement.class);
+ private static int merge_time = 0;
private final int sizeRatio =
IoTDBDescriptor.getInstance().getConfig().getSizeRatio();
private final int firstLevelNum = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
@@ -62,11 +63,19 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
@Override
protected void merge(long timePartition) {
- merge(forkedSequenceTsFileResources, timePartition);
+ long startTimeMillis = System.currentTimeMillis();
+ merge1(forkedSequenceTsFileResources, timePartition);
+ long time = System.currentTimeMillis();
+ merge_time += time - startTimeMillis;
+ logger.info("merge 总时长: {}", merge_time);
if (enableUnseqCompaction && forkedUnSequenceTsFileResources.size() > 0) {
merge(isForceFullMerge, getTsFileList(true),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
}
+ this.forkCurrentFileList(timePartition);
+ if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
+ merge(timePartition);
+ }
}
protected void merge1(List<List<TsFileResource>> mergeResources, long
timePartition) {
@@ -373,7 +382,7 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
for (TsFileResource tsFileResource : levelRawTsFileResources) {
if (tsFileResource.isClosed()) {
forkedLevelTsFileResources.add(tsFileResource);
- if (forkedLevelTsFileResources.size() > firstLevelNum *
Math.pow(sizeRatio, i)) {
+ if (forkedLevelTsFileResources.size() >= firstLevelNum *
Math.pow(sizeRatio, i)) {
break;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index 5a91728..cfeeae8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -24,11 +24,17 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +42,15 @@ import org.slf4j.LoggerFactory;
public class HashMapHitter implements QueryHeavyHitters {
private static final Logger logger =
LoggerFactory.getLogger(HashMapHitter.class);
+ int hitter = IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum();
private Map<PartialPath, Integer> counter = new HashMap<>();
+ private PriorityQueue<Entry<PartialPath, Integer>> topHeap = new
PriorityQueue<>(hitter,
+ new Comparator<Entry<PartialPath, Integer>>() {
+ @Override
+ public int compare(Entry<PartialPath, Integer> o1, Entry<PartialPath,
Integer> o2) {
+ return o2.getValue() - o1.getValue();
+ }
+ });
public HashMapHitter(int maxHitterNum) {
@@ -49,7 +63,18 @@ public class HashMapHitter implements QueryHeavyHitters {
@Override
public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws
MetadataException {
- return null;
+ List<PartialPath> ret = new ArrayList<>();
+ topHeap.addAll(counter.entrySet());
+ List<PartialPath> sgPaths =
MManager.getInstance().getAllTimeseriesPath(sgName);
+ for (int k = 0; k < hitter; k++) {
+ if (!topHeap.isEmpty()) {
+ PartialPath path = topHeap.poll().getKey();
+ if (sgPaths.contains(path)) {
+ ret.add(path);
+ }
+ }
+ }
+ return ret;
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1810179..1c1a055 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1746,11 +1746,11 @@ public class StorageGroupProcessor {
public void testMerge() throws Exception {
// fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(213524);
+ tsFileManagement.forkCurrentFileList(0);
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
tsFileManagement.new
CompactionMergeTask(this::closeCompactionMergeCallBack,
- 213524));
+ 0));
}