This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8c38f6d Fix unseq compaction file selector conflicts with time
partition bug (#2920)
8c38f6d is described below
commit 8c38f6d859f96bafaf8b4935a22fa55225002629
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Wed Mar 31 13:52:05 2021 +0800
Fix unseq compaction file selector conflicts with time partition bug (#2920)
Co-authored-by: zhanglingzhe <[email protected]>
---
.../db/engine/compaction/TsFileManagement.java | 10 +-
.../level/LevelCompactionTsFileManagement.java | 44 +++--
.../no/NoCompactionTsFileManagement.java | 180 ++++++++++++++++-----
.../db/engine/merge/task/MergeMultiChunkTask.java | 1 -
.../engine/merge/MaxFileMergeFileSelectorTest.java | 37 +++++
5 files changed, 221 insertions(+), 51 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 6225735..bc734b2 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -82,9 +82,17 @@ public abstract class TsFileManagement {
isForceFullMerge = forceFullMerge;
}
- /** get the TsFile list in sequence */
+ /**
+ * get the TsFile list in sequence, not recommend to use this method, use
+ * getTsFileListByTimePartition instead
+ */
+ @Deprecated
public abstract List<TsFileResource> getTsFileList(boolean sequence);
+ /** get the TsFile list in sequence by time partition */
+ public abstract List<TsFileResource> getTsFileListByTimePartition(
+ boolean sequence, long timePartition);
+
/** get the TsFile list iterator in sequence */
public abstract Iterator<TsFileResource> getIterator(boolean sequence);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9284dfc..86f8148 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -167,24 +167,42 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
}
+ @Deprecated
@Override
public List<TsFileResource> getTsFileList(boolean sequence) {
List<TsFileResource> result = new ArrayList<>();
if (sequence) {
synchronized (sequenceTsFileResources) {
- for (List<SortedSet<TsFileResource>> sequenceTsFileList :
- sequenceTsFileResources.values()) {
- for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
- result.addAll(sequenceTsFileList.get(i));
- }
+ for (long timePartition : sequenceTsFileResources.keySet()) {
+ result.addAll(getTsFileListByTimePartition(true, timePartition));
}
}
} else {
synchronized (unSequenceTsFileResources) {
- for (List<List<TsFileResource>> unSequenceTsFileList :
unSequenceTsFileResources.values()) {
- for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
- result.addAll(unSequenceTsFileList.get(i));
- }
+ for (long timePartition : unSequenceTsFileResources.keySet()) {
+ result.addAll(getTsFileListByTimePartition(false, timePartition));
+ }
+ }
+ }
+ return result;
+ }
+
+ public List<TsFileResource> getTsFileListByTimePartition(boolean sequence,
long timePartition) {
+ List<TsFileResource> result = new ArrayList<>();
+ if (sequence) {
+ synchronized (sequenceTsFileResources) {
+ List<SortedSet<TsFileResource>> sequenceTsFileList =
+ sequenceTsFileResources.get(timePartition);
+ for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
+ result.addAll(sequenceTsFileList.get(i));
+ }
+ }
+ } else {
+ synchronized (unSequenceTsFileResources) {
+ List<List<TsFileResource>> unSequenceTsFileList =
+ unSequenceTsFileResources.get(timePartition);
+ for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
+ result.addAll(unSequenceTsFileList.get(i));
}
}
}
@@ -557,7 +575,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
if (enableUnseqCompaction && unseqLevelNum <= 1 &&
forkedUnSequenceTsFileResources.size() > 0) {
merge(
isForceFullMerge,
- getTsFileList(true),
+ getTsFileListByTimePartition(true, timePartition),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
} else {
@@ -599,7 +617,11 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
// do not merge current unseq file level to upper level and just
merge all of them to
// seq file
isSeqMerging = false;
- merge(isForceFullMerge, getTsFileList(true),
mergeResources.get(i), Long.MAX_VALUE);
+ merge(
+ isForceFullMerge,
+ getTsFileListByTimePartition(true, timePartition),
+ mergeResources.get(i),
+ Long.MAX_VALUE);
} else {
compactionLogger = new CompactionLogger(storageGroupDir,
storageGroupName);
// log source file list and target file for recover
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 671955a..6d9864d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -26,43 +26,58 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
public class NoCompactionTsFileManagement extends TsFileManagement {
private static final Logger logger =
LoggerFactory.getLogger(NoCompactionTsFileManagement.class);
// includes sealed and unsealed sequence TsFiles
- private TreeSet<TsFileResource> sequenceFileTreeSet =
- new TreeSet<>(
- (o1, o2) -> {
- try {
- int rangeCompare =
- Long.compare(
- Long.parseLong(o1.getTsFile().getParentFile().getName()),
-
Long.parseLong(o2.getTsFile().getParentFile().getName()));
- return rangeCompare == 0
- ? compareFileName(o1.getTsFile(), o2.getTsFile())
- : rangeCompare;
- } catch (NumberFormatException e) {
- return compareFileName(o1.getTsFile(), o2.getTsFile());
- }
- });
+ private final Map<Long, TreeSet<TsFileResource>> sequenceFileTreeSetMap =
new TreeMap<>();
// includes sealed and unsealed unSequence TsFiles
- private List<TsFileResource> unSequenceFileList = new ArrayList<>();
+ private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new
TreeMap<>();
public NoCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
}
+ @Deprecated
@Override
public List<TsFileResource> getTsFileList(boolean sequence) {
+ List<TsFileResource> result = new ArrayList<>();
if (sequence) {
- return new ArrayList<>(sequenceFileTreeSet);
+ synchronized (sequenceFileTreeSetMap) {
+ for (TreeSet<TsFileResource> tsFileResourceTreeSet :
sequenceFileTreeSetMap.values()) {
+ result.addAll(tsFileResourceTreeSet);
+ }
+ }
} else {
- return new ArrayList<>(unSequenceFileList);
+ synchronized (unSequenceFileListMap) {
+ for (List<TsFileResource> tsFileResourceList :
unSequenceFileListMap.values()) {
+ result.addAll(tsFileResourceList);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<TsFileResource> getTsFileListByTimePartition(boolean sequence,
long timePartition) {
+ if (sequence) {
+ synchronized (sequenceFileTreeSetMap) {
+ return new
ArrayList<>(sequenceFileTreeSetMap.getOrDefault(timePartition, new
TreeSet<>()));
+ }
+ } else {
+ synchronized (unSequenceFileListMap) {
+ return new ArrayList<>(
+ unSequenceFileListMap.getOrDefault(timePartition,
Collections.emptyList()));
+ }
}
}
@@ -74,27 +89,79 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
@Override
public void remove(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
- sequenceFileTreeSet.remove(tsFileResource);
+ synchronized (sequenceFileTreeSetMap) {
+ TreeSet<TsFileResource> sequenceFileTreeSet =
+ sequenceFileTreeSetMap.get(tsFileResource.getTimePartition());
+ sequenceFileTreeSet.remove(tsFileResource);
+ }
} else {
- unSequenceFileList.remove(tsFileResource);
+ synchronized (unSequenceFileListMap) {
+ List<TsFileResource> unSequenceFileList =
+ unSequenceFileListMap.get(tsFileResource.getTimePartition());
+ unSequenceFileList.remove(tsFileResource);
+ }
}
}
@Override
public void removeAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- if (sequence) {
- sequenceFileTreeSet.removeAll(tsFileResourceList);
- } else {
- unSequenceFileList.removeAll(tsFileResourceList);
+ if (tsFileResourceList.size() > 0) {
+ tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() -
o2.getTimePartition()));
+ if (sequence) {
+ synchronized (sequenceFileTreeSetMap) {
+ long currTimePartition =
tsFileResourceList.get(0).getTimePartition();
+ int startIndex = 0;
+ for (int i = 1; i < tsFileResourceList.size(); i++) {
+ TsFileResource tsFileResource = tsFileResourceList.get(i);
+ if (tsFileResource.getTimePartition() != currTimePartition) {
+ sequenceFileTreeSetMap
+ .get(currTimePartition)
+ .removeAll(tsFileResourceList.subList(startIndex, i));
+ currTimePartition = tsFileResource.getTimePartition();
+ startIndex = i;
+ }
+ }
+ sequenceFileTreeSetMap
+ .get(currTimePartition)
+ .removeAll(tsFileResourceList.subList(startIndex,
tsFileResourceList.size()));
+ }
+ } else {
+ synchronized (unSequenceFileListMap) {
+ long currTimePartition =
tsFileResourceList.get(0).getTimePartition();
+ int startIndex = 0;
+ for (int i = 1; i < tsFileResourceList.size(); i++) {
+ TsFileResource tsFileResource = tsFileResourceList.get(i);
+ if (tsFileResource.getTimePartition() != currTimePartition) {
+ unSequenceFileListMap
+ .get(currTimePartition)
+ .removeAll(tsFileResourceList.subList(startIndex, i));
+ currTimePartition = tsFileResource.getTimePartition();
+ startIndex = i;
+ }
+ }
+ unSequenceFileListMap
+ .get(currTimePartition)
+ .removeAll(tsFileResourceList.subList(startIndex,
tsFileResourceList.size()));
+ }
+ }
}
}
@Override
public void add(TsFileResource tsFileResource, boolean sequence) {
+ long timePartitionId = tsFileResource.getTimePartition();
if (sequence) {
- sequenceFileTreeSet.add(tsFileResource);
+ synchronized (sequenceFileTreeSetMap) {
+ sequenceFileTreeSetMap
+ .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
+ .add(tsFileResource);
+ }
} else {
- unSequenceFileList.add(tsFileResource);
+ synchronized (unSequenceFileListMap) {
+ unSequenceFileListMap
+ .computeIfAbsent(timePartitionId,
this::newUnSequenceTsFileResources)
+ .add(tsFileResource);
+ }
}
}
@@ -105,44 +172,73 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
@Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean
sequence) {
- if (sequence) {
- sequenceFileTreeSet.addAll(tsFileResourceList);
- } else {
- unSequenceFileList.addAll(tsFileResourceList);
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ add(tsFileResource, sequence);
}
}
@Override
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
- return sequenceFileTreeSet.contains(tsFileResource);
+ synchronized (sequenceFileTreeSetMap) {
+ return sequenceFileTreeSetMap
+ .getOrDefault(tsFileResource.getTimePartition(),
newSequenceTsFileResources(0L))
+ .contains(tsFileResource);
+ }
} else {
- return unSequenceFileList.contains(tsFileResource);
+ synchronized (unSequenceFileListMap) {
+ return unSequenceFileListMap
+ .getOrDefault(tsFileResource.getTimePartition(), new ArrayList<>())
+ .contains(tsFileResource);
+ }
}
}
@Override
public void clear() {
- sequenceFileTreeSet.clear();
- unSequenceFileList.clear();
+ sequenceFileTreeSetMap.clear();
+ unSequenceFileListMap.clear();
}
@Override
public boolean isEmpty(boolean sequence) {
if (sequence) {
- return sequenceFileTreeSet.isEmpty();
+ synchronized (sequenceFileTreeSetMap) {
+ for (Set<TsFileResource> sequenceFileTreeSet :
sequenceFileTreeSetMap.values()) {
+ if (!sequenceFileTreeSet.isEmpty()) {
+ return false;
+ }
+ }
+ }
} else {
- return unSequenceFileList.isEmpty();
+ synchronized (unSequenceFileListMap) {
+ for (List<TsFileResource> unSequenceFileList :
unSequenceFileListMap.values()) {
+ if (!unSequenceFileList.isEmpty()) {
+ return false;
+ }
+ }
+ }
}
+ return true;
}
@Override
public int size(boolean sequence) {
+ int result = 0;
if (sequence) {
- return sequenceFileTreeSet.size();
+ synchronized (sequenceFileTreeSetMap) {
+ for (Set<TsFileResource> sequenceFileTreeSet :
sequenceFileTreeSetMap.values()) {
+ result += sequenceFileTreeSet.size();
+ }
+ }
} else {
- return unSequenceFileList.size();
+ synchronized (unSequenceFileListMap) {
+ for (List<TsFileResource> unSequenceFileList :
unSequenceFileListMap.values()) {
+ result += unSequenceFileList.size();
+ }
+ }
}
+ return result;
}
@Override
@@ -159,4 +255,12 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
protected void merge(long timePartition) {
logger.info("{} no merge logic", storageGroupName);
}
+
+ private TreeSet<TsFileResource> newSequenceTsFileResources(Long k) {
+ return new TreeSet<>((o1, o2) -> compareFileName(o1.getTsFile(),
o2.getTsFile()));
+ }
+
+ private List<TsFileResource> newUnSequenceTsFileResources(Long k) {
+ return new ArrayList<>();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 727d9e2..2b4bb4e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -553,7 +553,6 @@ public class MergeMultiChunkTask {
for (int i = 0; i < batchData.length(); i++) {
long time = batchData.getTimeByIndex(i);
// merge data in batch and data in unseqReader
-
boolean overwriteSeqPoint = false;
// unseq point.time <= sequence point.time, write unseq point
while (currTimeValuePairs[pathIdx] != null
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
index b77f2ce..14c72d4 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
@@ -199,4 +199,41 @@ public class MaxFileMergeFileSelectorTest extends
MergeTest {
assertEquals(1, mergeResource.getUnseqFiles().size());
mergeResource.clear();
}
+
+ /**
+ * test unseq merge select with the following files: {0seq-0-0-0.tsfile
0-100 1seq-1-1-0.tsfile
+ * 100-200 2seq-2-2-0.tsfile 200-300 3seq-3-3-0.tsfile 300-400
4seq-4-4-0.tsfile 400-500}
+ * {10unseq-10-10-0.tsfile 0-101}
+ */
+ @Test
+ public void testFileSelectionAboutLastSeqFile()
+ throws MergeException, IOException, WriteProcessException {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource largeUnseqTsFileResource = new TsFileResource(file);
+ largeUnseqTsFileResource.setClosed(true);
+ largeUnseqTsFileResource.setMinPlanIndex(10);
+ largeUnseqTsFileResource.setMaxPlanIndex(10);
+ largeUnseqTsFileResource.setVersion(10);
+ prepareFile(largeUnseqTsFileResource, 0, ptNum + 1, 0);
+
+ unseqResources.clear();
+ unseqResources.add(largeUnseqTsFileResource);
+
+ MergeResource resource = new MergeResource(seqResources, unseqResources);
+ IMergeFileSelector mergeFileSelector = new
MaxFileMergeFileSelector(resource, Long.MAX_VALUE);
+ List[] result = mergeFileSelector.select();
+ assertEquals(2, result[0].size());
+ resource.clear();
+ }
}