This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dynamic_compaction by this
push:
new 59217c5 fix full merge in dynamic compaction
59217c5 is described below
commit 59217c5320c0d399292c02897222a9d8d87245ab
Author: EJTTianyu <[email protected]>
AuthorDate: Fri May 21 11:46:41 2021 +0800
fix full merge in dynamic compaction
---
.../HitterLevelCompactionTsFileManagement.java | 6 ++--
.../engine/heavyhitter/hitter/HashMapHitter.java | 34 +++++++++++++++-------
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 6292903..2761feb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -227,7 +227,7 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
}
}
}
- List<TsFileResource> fullMergeRes = new
ArrayList<>(mergeResources.get(seqLevelNum - 2));
+ List<TsFileResource> fullMergeRes = new
ArrayList<>(mergeResources.get(seqLevelNum - 1));
FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes,
timePartition);
new Thread(fullMergeTask).start();
} catch (Exception e) {
@@ -463,8 +463,8 @@ public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFile
new HashSet<>(), true);
writeLock();
try {
- sequenceTsFileResources.get(timePartitionId).get(seqLevelNum -
1).add(newResource);
- deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum -
2, true);
+
sequenceTsFileResources.get(timePartitionId).get(seqLevelNum).add(newResource);
+ deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum -
1, true);
} finally {
writeUnlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index cfeeae8..8f98118 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
public class HashMapHitter implements QueryHeavyHitters {
private static final Logger logger =
LoggerFactory.getLogger(HashMapHitter.class);
+ private final ReadWriteLock hitterLock = new ReentrantReadWriteLock();
int hitter = IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum();
private Map<PartialPath, Integer> counter = new HashMap<>();
private PriorityQueue<Entry<PartialPath, Integer>> topHeap = new
PriorityQueue<>(hitter,
@@ -58,23 +61,34 @@ public class HashMapHitter implements QueryHeavyHitters {
@Override
public void acceptQuerySeries(PartialPath queryPath) {
- counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+ hitterLock.writeLock().lock();
+ try {
+ counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+ } finally {
+ hitterLock.writeLock().unlock();
+ }
}
@Override
public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws
MetadataException {
- List<PartialPath> ret = new ArrayList<>();
- topHeap.addAll(counter.entrySet());
- List<PartialPath> sgPaths =
MManager.getInstance().getAllTimeseriesPath(sgName);
- for (int k = 0; k < hitter; k++) {
- if (!topHeap.isEmpty()) {
- PartialPath path = topHeap.poll().getKey();
- if (sgPaths.contains(path)) {
- ret.add(path);
+ hitterLock.readLock().lock();
+ try {
+ List<PartialPath> ret = new ArrayList<>();
+ topHeap.addAll(counter.entrySet());
+ List<PartialPath> sgPaths =
MManager.getInstance().getAllTimeseriesPath(sgName);
+ for (int k = 0; k < hitter; k++) {
+ if (!topHeap.isEmpty()) {
+ PartialPath path = topHeap.poll().getKey();
+ if (sgPaths.contains(path)) {
+ ret.add(path);
+ }
}
}
+ topHeap.clear();
+ return ret;
+ } finally {
+ hitterLock.readLock().unlock();
}
- return ret;
}
/**