This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 66ba37459c [To rel/0.13][IOTDB-5726]Select the last sealed seq file
for nonOverlap unseq files to compact in cross compaction (#9495)
66ba37459c is described below
commit 66ba37459c2f6aefab0a32d2a33656e4f6e201aa
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Apr 7 15:14:44 2023 +0800
[To rel/0.13][IOTDB-5726]Select the last sealed seq file for nonOverlap
unseq files to compact in cross compaction (#9495)
---
.../selector/RewriteCompactionFileSelector.java | 26 ++-
.../sizetiered/SizeTieredCompactionSelector.java | 4 +-
.../engine/compaction/AbstractCompactionTest.java | 76 +++++++
.../cross/CrossSpaceCompactionValidationTest.java | 241 ++++++++++++++++++++-
4 files changed, 341 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index e35fe27bdf..1f12fd5f24 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo;
@@ -48,7 +49,8 @@ import java.util.Map;
*/
public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelector {
- private static final Logger logger =
LoggerFactory.getLogger(RewriteCompactionFileSelector.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
CrossSpaceCompactionResource resource;
@@ -189,6 +191,9 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
if (seqSelectedNum != resource.getSeqFiles().size()) {
selectOverlappedSeqFiles(unseqFile);
}
+ if (tmpSelectedSeqFiles.isEmpty() &&
!tryToSelectLatestSealedSeqFile(unseqFile)) {
+ break;
+ }
boolean isSeqFilesValid = checkIsSeqFilesValid();
if (!isSeqFilesValid) {
tmpSelectedSeqFiles.clear();
@@ -229,6 +234,25 @@ public class RewriteCompactionFileSelector implements
ICrossSpaceMergeFileSelect
}
}
+ /**
+ * If the unseq file does not overlap with any seq files, then select the
latest sealed seq file
+ * for it to compact with. Notice: If the data is deleted and then start an
inner space
+ * compaction, it may cause unseqStartTime > seqEndTime or partial devices
are in the unseq files
+ * but not in seq files, which will cause the unseq file not overlap with
any seq files
+ */
+ private boolean tryToSelectLatestSealedSeqFile(TsFileResource unseqFile) {
+ logger.info("Unseq file {} does not overlap with seq files.", unseqFile);
+ List<TsFileResource> seqResources = resource.getSeqFiles();
+ for (int i = seqResources.size() - 1; i >= 0; i--) {
+ if (seqResources.get(i).isClosed()) {
+ logger.info("Select the latest closed seq file {} for it to compact.",
seqResources.get(i));
+ tmpSelectedSeqFiles.add(i);
+ return true;
+ }
+ }
+ return false;
+ }
+
private boolean updateSelectedFiles(long newCost, TsFileResource unseqFile) {
if (selectedUnseqFiles.size() == 0
|| (seqSelectedNum + selectedUnseqFiles.size() + 1 +
tmpSelectedSeqFiles.size()
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index ee4a45d9db..2071e21ba1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -113,7 +113,7 @@ public class SizeTieredCompactionSelector extends
AbstractInnerSpaceCompactionSe
* @return return whether to continue the search to higher levels
* @throws IOException
*/
- private boolean selectLevelTask(
+ public boolean selectLevelTask(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>>
taskPriorityQueue)
throws IOException {
boolean shouldContinueToSearch = true;
@@ -180,7 +180,7 @@ public class SizeTieredCompactionSelector extends
AbstractInnerSpaceCompactionSe
return
CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
}
- private class SizeTieredCompactionTaskComparator
+ public static class SizeTieredCompactionTaskComparator
implements Comparator<Pair<List<TsFileResource>, Long>> {
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index 97084de2b6..1f47b67e9c 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -35,8 +37,12 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
@@ -45,15 +51,21 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+import static org.junit.Assert.fail;
public class AbstractCompactionTest {
protected int seqFileNum = 5;
protected int unseqFileNum = 0;
protected List<TsFileResource> seqResources = new ArrayList<>();
protected List<TsFileResource> unseqResources = new ArrayList<>();
+
+ protected TsFileManager tsFileManager =
+ new TsFileManager(TsFileGeneratorUtils.testStorageGroup, "0",
STORAGE_GROUP_DIR.getPath());
private int chunkGroupSize = 0;
private int pageSize = 0;
protected String COMPACTION_TEST_SG = TsFileGeneratorUtils.testStorageGroup;
@@ -341,6 +353,70 @@ public class AbstractCompactionTest {
}
}
+ protected Map<PartialPath, List<TimeValuePair>> readSourceFiles(
+ List<PartialPath> timeseriesPaths, List<TSDataType> dataTypes) throws
IOException {
+ Map<PartialPath, List<TimeValuePair>> sourceData = new LinkedHashMap<>();
+ for (PartialPath path : timeseriesPaths) {
+ List<TimeValuePair> dataList = new ArrayList<>();
+ sourceData.put(path, dataList);
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ path.getSeriesType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
+ null,
+ null,
+ true);
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ dataList.add(
+ new TimeValuePair(
+ batchData.currentTime(),
+ TsPrimitiveType.getByType(path.getSeriesType(),
batchData.currentValue())));
+ batchData.next();
+ }
+ }
+ }
+ return sourceData;
+ }
+
+ protected void validateTargetDatas(
+ Map<PartialPath, List<TimeValuePair>> sourceDatas, List<TSDataType>
dataTypes)
+ throws IOException {
+ for (Map.Entry<PartialPath, List<TimeValuePair>> entry :
sourceDatas.entrySet()) {
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ entry.getKey(),
+ entry.getKey().getSeriesType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
+ null,
+ null,
+ true);
+ List<TimeValuePair> timeseriesData = entry.getValue();
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ TimeValuePair data = timeseriesData.remove(0);
+ Assert.assertEquals(data.getTimestamp(), batchData.currentTime());
+ Assert.assertEquals(
+ data.getValue(),
+ TsPrimitiveType.getByType(entry.getKey().getSeriesType(),
batchData.currentValue()));
+ batchData.next();
+ }
+ }
+ if (timeseriesData.size() > 0) {
+ // there are still data points left, which are not in the target file.
Lost the data after
+ // compaction.
+ fail();
+ }
+ }
+ }
+
protected void deleteTimeseriesInMManager(List<String> timeseries) throws
MetadataException {
for (String path : timeseries) {
IoTDB.metaManager.deleteTimeseries(new PartialPath(path));
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
index 109d49149e..1bf87595c5 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
@@ -26,16 +26,24 @@ import
org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceComp
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceMergeFileSelector;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.RewriteCrossSpaceCompactionTask;
-import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import
org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
+import
org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionSelector;
+import
org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
import org.junit.Assert;
@@ -45,12 +53,16 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest
{
- TsFileManager tsFileManager =
- new TsFileManager(COMPACTION_TEST_SG, "0", STORAGE_GROUP_DIR.getPath());
private final String oldThreadName = Thread.currentThread().getName();
@@ -2102,4 +2114,227 @@ public class CrossSpaceCompactionValidationTest extends
AbstractCompactionTest {
Assert.assertEquals(0, TsFileValidationTool.badFileNum);
TsFileValidationTool.clearMap();
}
+
+ @Test
+ public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles1() throws
Exception {
+
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+ createFiles(5, 10, 5, 1000, 0, 0, 100, 100, false, true);
+ createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+ createFiles(3, 10, 5, 1000, 7500, 7500, 100, 100, false, true);
+
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // delete d0 ~ d5 in seq files
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ for (int d = 0; d < 5; d++) {
+ for (int m = 0; m < 5; m++) {
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR +
"s" + m,
+ new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+ }
+ }
+ for (TsFileResource resource : seqResources) {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+ }
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR
+ "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // inner seq space compact
+ SizeTieredCompactionSelector sizeTieredCompactionSelector =
+ new SizeTieredCompactionSelector(
+ COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new
InnerSpaceCompactionTaskFactory());
+
+ PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+ new PriorityQueue<>(new
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+ sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+ ;
+ for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+ new SizeTieredCompactionTask(
+ COMPACTION_TEST_SG,
+ "0",
+ 0,
+ tsFileManager,
+ taskResource.left,
+ true,
+ new AtomicInteger(0))
+ .call();
+ }
+
+ // select cross compaction
+ CrossSpaceCompactionResource crossSpaceCompactionResource =
+ new CrossSpaceCompactionResource(
+ tsFileManager.getTsFileList(true),
tsFileManager.getTsFileList(false));
+ RewriteCompactionFileSelector crossSpaceCompactionSelector =
+ new RewriteCompactionFileSelector(crossSpaceCompactionResource,
Long.MAX_VALUE);
+ List[] pairs = crossSpaceCompactionSelector.select();
+ Assert.assertEquals(2, pairs.length);
+ Assert.assertEquals(1, pairs[0].size());
+ Assert.assertEquals(2, pairs[1].size());
+
+ new RewriteCrossSpaceCompactionTask(
+ "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new
AtomicInteger(0), 0)
+ .call();
+
+ validateSeqFiles();
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ @Test
+ public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles2() throws
Exception {
+
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+ createFiles(5, 10, 5, 1000, 0, 0, 100, 100, false, true);
+ createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // delete d0 ~ d5 in seq files
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ for (int d = 0; d < 5; d++) {
+ for (int m = 0; m < 5; m++) {
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR +
"s" + m,
+ new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+ }
+ }
+ for (TsFileResource resource : seqResources) {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+ }
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR
+ "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // inner seq space compact
+ SizeTieredCompactionSelector sizeTieredCompactionSelector =
+ new SizeTieredCompactionSelector(
+ COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new
InnerSpaceCompactionTaskFactory());
+
+ PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+ new PriorityQueue<>(new
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+ sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+ ;
+ for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+ new SizeTieredCompactionTask(
+ COMPACTION_TEST_SG,
+ "0",
+ 0,
+ tsFileManager,
+ taskResource.left,
+ true,
+ new AtomicInteger(0))
+ .call();
+ }
+
+ // select cross compaction
+ CrossSpaceCompactionResource crossSpaceCompactionResource =
+ new CrossSpaceCompactionResource(
+ tsFileManager.getTsFileList(true),
tsFileManager.getTsFileList(false));
+ RewriteCompactionFileSelector crossSpaceCompactionSelector =
+ new RewriteCompactionFileSelector(crossSpaceCompactionResource,
Long.MAX_VALUE);
+ List[] pairs = crossSpaceCompactionSelector.select();
+ Assert.assertEquals(2, pairs.length);
+ Assert.assertEquals(1, pairs[0].size());
+ Assert.assertEquals(2, pairs[1].size());
+
+ new RewriteCrossSpaceCompactionTask(
+ "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new
AtomicInteger(0), 0)
+ .call();
+
+ validateSeqFiles();
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
+
+ @Test
+ public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles3() throws
Exception {
+
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+ createFiles(4, 10, 5, 1000, 0, 0, 100, 100, false, true);
+ createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+ createFiles(1, 10, 5, 1000, 7500, 7500, 100, 100, false, true);
+
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // delete d0 ~ d5 in seq files
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ for (int d = 0; d < 5; d++) {
+ for (int m = 0; m < 5; m++) {
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR +
"s" + m,
+ new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+ }
+ }
+ for (TsFileResource resource : seqResources) {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+ }
+
+ List<PartialPath> timeseriesPaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ timeseriesPaths.add(
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR
+ "s" + j,
+ TSDataType.INT64));
+ }
+ }
+ Map<PartialPath, List<TimeValuePair>> sourceData =
+ readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+ // inner seq space compact
+ SizeTieredCompactionSelector sizeTieredCompactionSelector =
+ new SizeTieredCompactionSelector(
+ COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new
InnerSpaceCompactionTaskFactory());
+
+ PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+ new PriorityQueue<>(new
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+ sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+ for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+ new SizeTieredCompactionTask(
+ COMPACTION_TEST_SG,
+ "0",
+ 0,
+ tsFileManager,
+ taskResource.left,
+ true,
+ new AtomicInteger(0))
+ .call();
+ }
+
+ // select cross compaction
+ CrossSpaceCompactionResource crossSpaceCompactionResource =
+ new CrossSpaceCompactionResource(
+ tsFileManager.getTsFileList(true),
tsFileManager.getTsFileList(false));
+ RewriteCompactionFileSelector crossSpaceCompactionSelector =
+ new RewriteCompactionFileSelector(crossSpaceCompactionResource,
Long.MAX_VALUE);
+ List[] pairs = crossSpaceCompactionSelector.select();
+ Assert.assertEquals(2, pairs.length);
+ Assert.assertEquals(1, pairs[0].size());
+ Assert.assertEquals(2, pairs[1].size());
+
+ new RewriteCrossSpaceCompactionTask(
+ "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new
AtomicInteger(0), 0)
+ .call();
+
+ validateSeqFiles();
+ validateTargetDatas(sourceData, Collections.emptyList());
+ }
}