This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 1e323fd [To rel/0.12][IOTDB-2584]Fix cross space compaction selector
(#5101)
1e323fd is described below
commit 1e323fdb34544e4ddaca6f0d067c08425e336414
Author: 周沛辰 <[email protected]>
AuthorDate: Tue Feb 22 23:03:51 2022 +0800
[To rel/0.12][IOTDB-2584]Fix cross space compaction selector (#5101)
---
.../db/engine/compaction/TsFileManagement.java | 8 -
.../db/engine/merge/manage/MergeResource.java | 39 ++-
.../merge/selector/MaxFileMergeFileSelector.java | 73 +++--
.../db/engine/storagegroup/TsFileResource.java | 7 +-
.../engine/merge/MaxFileMergeFileSelectorTest.java | 324 +++++++++++++++++++++
.../iotdb/db/engine/merge/MergeTaskTest.java | 8 +-
6 files changed, 409 insertions(+), 50 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index a2f18b9..ccfb35d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -228,14 +228,6 @@ public abstract class TsFileManagement {
return false;
}
- if (unSeqMergeList.size() > maxOpenFileNumInEachUnseqCompaction) {
- logger.info(
- "{} too much unseq files to be merged, reduce it to {}",
- storageGroupName,
- maxOpenFileNumInEachUnseqCompaction);
- unSeqMergeList = unSeqMergeList.subList(0,
maxOpenFileNumInEachUnseqCompaction);
- }
-
long budget =
IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
long timeLowerBound = System.currentTimeMillis() - dataTTL;
MergeResource mergeResource = new MergeResource(seqMergeList,
unSeqMergeList, timeLowerBound);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index c347c48..7812f20 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -60,7 +60,7 @@ import static
org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
public class MergeResource {
private List<TsFileResource> seqFiles;
- private List<TsFileResource> unseqFiles;
+ private List<TsFileResource> unseqFiles = new ArrayList<>();
private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new
HashMap<>();
private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new
HashMap<>();
@@ -74,23 +74,36 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;
public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource>
unseqFiles) {
- this.seqFiles =
seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
- this.unseqFiles =
unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
- }
-
- /** If returns true, it means to participate in the merge */
- private boolean filterResource(TsFileResource res) {
- return res.getTsFile().exists()
- && !res.isDeleted()
- && (!res.isClosed() || res.stillLives(timeLowerBound))
- && !res.isMerging();
+ this.seqFiles =
seqFiles.stream().filter(this::filterSeqResource).collect(Collectors.toList());
+ filterUnseqResource(unseqFiles);
}
public MergeResource(
Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
long timeLowerBound) {
this.timeLowerBound = timeLowerBound;
- this.seqFiles =
seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
- this.unseqFiles =
unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+ this.seqFiles =
seqFiles.stream().filter(this::filterSeqResource).collect(Collectors.toList());
+ filterUnseqResource(unseqFiles);
+ }
+
+ /** Fitler the seq files into the compaction. Seq files should be not
deleted or over ttl. */
+ private boolean filterSeqResource(TsFileResource res) {
+ return !res.isDeleted() && res.stillLives(timeLowerBound);
+ }
+
+ /**
+ * Filter the unseq files into the compaction. Unseq files should be not
deleted or over ttl. To
+ * ensure that the compaction is correct, return as soon as it encounters
the file being compacted
+ * or not closed. Therefore, a cross space compaction can only be performed
serially under a time
+ * partition in a VSG.
+ */
+ private void filterUnseqResource(List<TsFileResource> unseqResources) {
+ for (TsFileResource resource : unseqResources) {
+ if (resource.isMerging() || !resource.isClosed()) {
+ return;
+ } else if (!resource.isDeleted() && resource.stillLives(timeLowerBound))
{
+ this.unseqFiles.add(resource);
+ }
+ }
}
public void clear() throws IOException {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
index d55eaab..5d08391 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
@@ -155,12 +155,10 @@ public class MaxFileMergeFileSelector implements
IMergeFileSelector {
if (seqSelectedNum != resource.getSeqFiles().size()) {
selectOverlappedSeqFiles(unseqFile);
}
- boolean isClosed = checkChoosable(unseqFile);
- if (!isClosed) {
+ boolean isSeqFilesValid = checkIsSeqFilesValid();
+ if (!isSeqFilesValid) {
tmpSelectedSeqFiles.clear();
- unseqIndex++;
- timeConsumption = System.currentTimeMillis() - startTime;
- continue;
+ break;
}
tempMaxSeqFileCost = maxSeqFileCost;
@@ -190,8 +188,10 @@ public class MaxFileMergeFileSelector implements
IMergeFileSelector {
maxSeqFileCost = tempMaxSeqFileCost;
for (Integer seqIdx : tmpSelectedSeqFiles) {
- seqSelected[seqIdx] = true;
- seqSelectedNum++;
+ if (!seqSelected[seqIdx]) {
+ seqSelectedNum++;
+ seqSelected[seqIdx] = true;
+ }
}
totalCost += newCost;
logger.debug(
@@ -206,21 +206,31 @@ public class MaxFileMergeFileSelector implements
IMergeFileSelector {
return false;
}
- private boolean checkChoosable(TsFileResource unseqFile) {
- boolean choosable = unseqFile.isClosed() && !unseqFile.isMerging();
- if (!choosable) {
- return false;
- }
+ /**
+ * To avoid unseq data in seq files, cross space compaction should select
all the seq files which
+ * have overlap with unseq files whether they are compacting or not.
Therefore, before adding task
+ * into the queue, cross space compaction task should check whether source
seq files are being
+ * compacted or not to speed up compaction.
+ */
+ private boolean checkIsSeqFilesValid() {
for (Integer seqIdx : tmpSelectedSeqFiles) {
- if (!resource.getSeqFiles().get(seqIdx).isClosed()
- || resource.getSeqFiles().get(seqIdx).isMerging()) {
- choosable = false;
- break;
+ if (resource.getSeqFiles().get(seqIdx).isMerging()
+ || !resource.getSeqFiles().get(seqIdx).isClosed()) {
+ return false;
}
}
- return choosable;
+ return true;
}
+ /**
+ * Put the index of the seqFile that has an overlap with the specific
unseqFile and has not been
+ * selected by the file selector of the compaction task into the
tmpSelectedSeqFiles list. To
+ * determine whether overlap exists is to traverse each device ChunkGroup in
unseqFiles, and
+ * determine whether it overlaps with the same device ChunkGroup of each
seqFile that are not
+ * selected by the compaction task, if so, select this seqFile.
+ *
+ * @param unseqFile the tsFileResource of unseqFile to be compacted
+ */
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
int tmpSelectedNum = 0;
for (String deviceId : unseqFile.getDevices()) {
@@ -230,20 +240,33 @@ public class MaxFileMergeFileSelector implements
IMergeFileSelector {
boolean noMoreOverlap = false;
for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap;
i++) {
TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (seqSelected[i] || !seqFile.getDevices().contains(deviceId)) {
+ if (!seqFile.getDevices().contains(deviceId)) {
continue;
}
- // the open file's endTime is Long.MIN_VALUE, this will make the file
be filtered below
- long seqEndTime = seqFile.isClosed() ? seqFile.getEndTime(deviceId) :
Long.MAX_VALUE;
- if (unseqEndTime <= seqEndTime) {
- // the unseqFile overlaps current seqFile
+
+ long seqEndTime = seqFile.getEndTime(deviceId);
+ long seqStartTime = seqFile.getStartTime(deviceId);
+ if (unseqEndTime < seqStartTime) {
+ // Suppose the time range in unseq file is 10-20, seq file is 30-40.
If this unseq file
+ // has no overlapped seq files, then select this seq file.
Otherwise, skip this seq file.
+ // There is no more overlap later.
+ if (tmpSelectedSeqFiles.size() == 0) {
+ tmpSelectedSeqFiles.add(i);
+ }
+ noMoreOverlap = true;
+ } else if (!seqFile.isClosed()) {
+ // we cannot make sure whether unclosed file has overlap or not, so
we just add it.
+ tmpSelectedSeqFiles.add(i);
+ tmpSelectedNum++;
+ } else if (unseqEndTime <= seqEndTime) {
+ // if time range in unseq file is 10-20, seq file is 15-25, then
select this seq file and
+ // there is no more overlap later.
tmpSelectedSeqFiles.add(i);
tmpSelectedNum++;
- // the device of the unseqFile can not merge with later seqFiles
noMoreOverlap = true;
} else if (unseqStartTime <= seqEndTime) {
- // the device of the unseqFile may merge with later seqFiles
- // and the unseqFile overlaps current seqFile
+ // if time range in unseq file is 10-20, seq file is 0-15, then
select this seq file and
+ // there may be overlap later.
tmpSelectedSeqFiles.add(i);
tmpSelectedNum++;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2ce4cbf..3a5af2c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -603,9 +603,12 @@ public class TsFileResource {
isMerging = merging;
}
- /** check if any of the device lives over the given time bound */
+ /**
+ * check if any of the device lives over the given time bound. If the file
is not closed, then
+ * return true.
+ */
public boolean stillLives(long timeLowerBound) {
- return timeIndex.stillLives(timeLowerBound);
+ return !isClosed() || timeIndex.stillLives(timeLowerBound);
}
public boolean isSatisfied(
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
index a46b914..8fc9ac3 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.junit.Assert;
import org.junit.Test;
import java.io.File;
@@ -309,4 +310,327 @@ public class MaxFileMergeFileSelectorTest extends
MergeTest {
removeFiles(seqList, unseqList);
}
}
+
+ /**
+ * 5 source seq files: [11,11] [12,12] [13,13] [14,14] [15,15]<br>
+ * 10 source unseq files: [0,0] [1,1] ... [9,9]<br>
+ * selected seq file index: 1<br>
+ * selected unseq file index: 1 ~ 10
+ */
+ @Test
+ public void testUnseqFilesOverlappedWithOneSeqFile()
+ throws IOException, WriteProcessException, MergeException {
+ List<TsFileResource> seqList = new ArrayList<>();
+ List<TsFileResource> unseqList = new ArrayList<>();
+ // 5 seq files [11,11] [12,12] [13,13] ... [15,15]
+ int seqFileNum = 5;
+ for (int i = 11; i < seqFileNum + 11; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "seq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ seqList.add(fileResource);
+ }
+ int unseqFileNum = 10;
+ // 10 unseq files [0,0] [1,1] ... [9,9]
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ unseqList.add(fileResource);
+ }
+
+ MergeResource resource = new MergeResource(seqList, unseqList);
+ Assert.assertEquals(5, resource.getSeqFiles().size());
+ Assert.assertEquals(10, resource.getUnseqFiles().size());
+ IMergeFileSelector mergeFileSelector =
+ new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
+ List[] result = mergeFileSelector.select();
+ Assert.assertEquals(2, result.length);
+ Assert.assertEquals(1, result[0].size());
+ Assert.assertEquals(10, result[1].size());
+ }
+
+ /**
+ * 5 source seq files: [11,11] [12,12] [13,13] [14,14] [15,15]<br>
+ * 1 source unseq files: [0 ~ 9]<br>
+ * selected seq file index: 1<br>
+ * selected unseq file index: 1
+ */
+ @Test
+ public void testOneUnseqFileOverlappedWithOneSeqFile()
+ throws IOException, WriteProcessException, MergeException {
+ List<TsFileResource> seqList = new ArrayList<>();
+ List<TsFileResource> unseqList = new ArrayList<>();
+ // 5 seq files [11,11] [12,12] [13,13] ... [15,15]
+ int seqFileNum = 5;
+ for (int i = 11; i < seqFileNum + 11; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "seq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ seqList.add(fileResource);
+ }
+ int unseqFileNum = 1;
+ // 1 unseq files [0~9]
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 10, i);
+ unseqList.add(fileResource);
+ }
+
+ MergeResource resource = new MergeResource(seqList, unseqList);
+ Assert.assertEquals(5, resource.getSeqFiles().size());
+ Assert.assertEquals(1, resource.getUnseqFiles().size());
+ IMergeFileSelector mergeFileSelector =
+ new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
+ List[] result = mergeFileSelector.select();
+ Assert.assertEquals(2, result.length);
+ Assert.assertEquals(1, result[0].size());
+ Assert.assertEquals(1, result[1].size());
+ }
+
+ /**
+ * 5 source seq files: [11,11] [12,12] [13,13] [14,14] [15,15]<br>
+ * 2 source unseq files: [7~9] [10~13]<br>
+ * selected seq file index: 1 2 3 <br>
+ * selected unseq file index: 1 2
+ */
+ @Test
+ public void testUnseqFilesOverlapped() throws IOException,
WriteProcessException, MergeException {
+ List<TsFileResource> seqList = new ArrayList<>();
+ List<TsFileResource> unseqList = new ArrayList<>();
+ // 5 seq files [11,11] [12,12] [13,13] ... [15,15]
+ int seqFileNum = 5;
+ for (int i = 11; i < seqFileNum + 11; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "seq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ seqList.add(fileResource);
+ }
+ int unseqFileNum = 2;
+ // 2 unseq files [7~9] [10~13]
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ unseqList.add(fileResource);
+ }
+ prepareFile(unseqList.get(0), 7, 3, 7);
+ prepareFile(unseqList.get(1), 10, 4, 10);
+
+ MergeResource resource = new MergeResource(seqList, unseqList);
+ Assert.assertEquals(5, resource.getSeqFiles().size());
+ Assert.assertEquals(2, resource.getUnseqFiles().size());
+ IMergeFileSelector mergeFileSelector =
+ new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
+ List[] result = mergeFileSelector.select();
+ Assert.assertEquals(2, result.length);
+ Assert.assertEquals(3, result[0].size());
+ Assert.assertEquals(2, result[1].size());
+ }
+
+ /**
+ * 5 source seq files: [11,11] [12,12] [13,13] [14,14] [15,15]<br>
+ * 4 source unseq files: [7~9] [10~13] [14~16] [17~18]<br>
+ * selected seq file index: 1 2 3 4 5<br>
+ * selected unseq file index: 1 2 3 4
+ */
+ @Test
+ public void testAllUnseqFilesOverlapped()
+ throws IOException, WriteProcessException, MergeException {
+ List<TsFileResource> seqList = new ArrayList<>();
+ List<TsFileResource> unseqList = new ArrayList<>();
+ // 5 seq files [11,11] [12,12] [13,13] ... [15,15]
+ int seqFileNum = 5;
+ for (int i = 11; i < seqFileNum + 11; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "seq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ seqList.add(fileResource);
+ }
+ int unseqFileNum = 4;
+ // 4 unseq files [7~9] [10~13] [14~16] [17~18]
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ unseqList.add(fileResource);
+ }
+ prepareFile(unseqList.get(0), 7, 3, 7);
+ prepareFile(unseqList.get(1), 10, 4, 10);
+ prepareFile(unseqList.get(2), 14, 3, 14);
+ prepareFile(unseqList.get(3), 17, 2, 17);
+
+ MergeResource resource = new MergeResource(seqList, unseqList);
+ Assert.assertEquals(5, resource.getSeqFiles().size());
+ Assert.assertEquals(4, resource.getUnseqFiles().size());
+ IMergeFileSelector mergeFileSelector =
+ new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
+ List[] result = mergeFileSelector.select();
+ Assert.assertEquals(2, result.length);
+ Assert.assertEquals(5, result[0].size());
+ Assert.assertEquals(4, result[1].size());
+ }
+
+ /**
+ * 5 source seq files: [11,11] [12,12] [13,13] [14,14] [15,15]<br>
+ * 4 source unseq files: [7~9] [10~13] [14~16] [17~18]<br>
+ * while the forth seq file is not close.<br>
+ * selected seq file index: 1 2 3 <br>
+ * selected unseq file index: 1 2
+ */
+ @Test
+ public void testAllUnseqFilesOverlappedWithSeqFileOpen()
+ throws IOException, WriteProcessException, MergeException {
+ List<TsFileResource> seqList = new ArrayList<>();
+ List<TsFileResource> unseqList = new ArrayList<>();
+ // 5 seq files [11,11] [12,12] [13,13] ... [15,15]
+ int seqFileNum = 5;
+ for (int i = 11; i < seqFileNum + 11; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "seq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ prepareFile(fileResource, i, 1, i);
+ seqList.add(fileResource);
+ }
+ seqList.get(3).setClosed(false);
+ int unseqFileNum = 4;
+ // 4 unseq files [7~9] [10~13] [14~16] [17~18]
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.OUTPUT_DATA_DIR.concat(
+ 10
+ + "unseq"
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 10
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource fileResource = new TsFileResource(file);
+ fileResource.setClosed(true);
+ unseqList.add(fileResource);
+ }
+ prepareFile(unseqList.get(0), 7, 3, 7);
+ prepareFile(unseqList.get(1), 10, 4, 10);
+ prepareFile(unseqList.get(2), 14, 3, 14);
+ prepareFile(unseqList.get(3), 17, 2, 17);
+
+ MergeResource resource = new MergeResource(seqList, unseqList);
+ Assert.assertEquals(5, resource.getSeqFiles().size());
+ Assert.assertEquals(4, resource.getUnseqFiles().size());
+ IMergeFileSelector mergeFileSelector =
+ new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
+ List[] result = mergeFileSelector.select();
+ Assert.assertEquals(2, result.length);
+ Assert.assertEquals(3, result[0].size());
+ Assert.assertEquals(2, result[1].size());
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index b49d6bb..6f058c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -559,8 +559,8 @@ public class MergeTaskTest extends MergeTest {
}
/**
- * merge 3 seqFile and 1 unseqFile seqFile1: d1.s1:0-100 d1.s2:0-100
seqFile2: d1.s1:100-200
- * seqFile3: d2.s1:0-100 unseqFile1: d1.s3:0-100
+ * merge 3 seqFile and 1 unseqFile seqFile1: d0.s0:0-100 d0.s1:0-100
seqFile2: d0.s0:100-200
+ * seqFile3: d1.s0:0-100 unseqFile1: d0.s2:0-100
*/
@Test
public void testMergeWithSeqFileMissSomeSensorAndDevice() throws Exception {
@@ -583,6 +583,7 @@ public class MergeTaskTest extends MergeTest {
seqTsFile1Data.put(new Pair<>(deviceIds[0], measurementSchemas[0]), new
Pair<>(0L, 100L));
seqTsFile1Data.put(new Pair<>(deviceIds[0], measurementSchemas[1]), new
Pair<>(0L, 100L));
prepareFileWithSensorAndTime(seqTsFile1, seqTsFile1Data);
+ seqTsFile1.setClosed(true);
testSeqResources.add(seqTsFile1);
file =
@@ -600,6 +601,7 @@ public class MergeTaskTest extends MergeTest {
Map<Pair<String, MeasurementSchema>, Pair<Long, Long>> seqTsFileData2 =
new HashMap<>();
seqTsFileData2.put(new Pair<>(deviceIds[0], measurementSchemas[0]), new
Pair<>(100L, 200L));
prepareFileWithSensorAndTime(seqTsFile2, seqTsFileData2);
+ seqTsFile2.setClosed(true);
testSeqResources.add(seqTsFile2);
file =
@@ -617,6 +619,7 @@ public class MergeTaskTest extends MergeTest {
Map<Pair<String, MeasurementSchema>, Pair<Long, Long>> seqTsFileData3 =
new HashMap<>();
seqTsFileData3.put(new Pair<>(deviceIds[1], measurementSchemas[0]), new
Pair<>(0L, 100L));
prepareFileWithSensorAndTime(seqTsFile3, seqTsFileData3);
+ seqTsFile3.setClosed(true);
testSeqResources.add(seqTsFile3);
file =
@@ -634,6 +637,7 @@ public class MergeTaskTest extends MergeTest {
Map<Pair<String, MeasurementSchema>, Pair<Long, Long>> unseqTsFileData =
new HashMap<>();
unseqTsFileData.put(new Pair<>(deviceIds[0], measurementSchemas[2]), new
Pair<>(0L, 100L));
prepareFileWithSensorAndTime(unseqTsFile, unseqTsFileData);
+ unseqTsFile.setClosed(true);
testUnseqResources.add(unseqTsFile);
MergeTask mergeTask =