This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 66ba37459c [To rel/0.13][IOTDB-5726]Select the last sealed seq file 
for nonOverlap unseq files to compact in cross compaction (#9495)
66ba37459c is described below

commit 66ba37459c2f6aefab0a32d2a33656e4f6e201aa
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Apr 7 15:14:44 2023 +0800

    [To rel/0.13][IOTDB-5726]Select the last sealed seq file for nonOverlap 
unseq files to compact in cross compaction (#9495)
---
 .../selector/RewriteCompactionFileSelector.java    |  26 ++-
 .../sizetiered/SizeTieredCompactionSelector.java   |   4 +-
 .../engine/compaction/AbstractCompactionTest.java  |  76 +++++++
 .../cross/CrossSpaceCompactionValidationTest.java  | 241 ++++++++++++++++++++-
 4 files changed, 341 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index e35fe27bdf..1f12fd5f24 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
 
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.TsFileDeviceInfoStore.DeviceInfo;
@@ -48,7 +49,8 @@ import java.util.Map;
  */
 public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelector {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RewriteCompactionFileSelector.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
 
   CrossSpaceCompactionResource resource;
@@ -189,6 +191,9 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
       if (seqSelectedNum != resource.getSeqFiles().size()) {
         selectOverlappedSeqFiles(unseqFile);
       }
+      if (tmpSelectedSeqFiles.isEmpty() && 
!tryToSelectLatestSealedSeqFile(unseqFile)) {
+        break;
+      }
       boolean isSeqFilesValid = checkIsSeqFilesValid();
       if (!isSeqFilesValid) {
         tmpSelectedSeqFiles.clear();
@@ -229,6 +234,25 @@ public class RewriteCompactionFileSelector implements 
ICrossSpaceMergeFileSelect
     }
   }
 
+  /**
+   * If the unseq file does not overlap with any seq files, then select the 
latest sealed seq file
+   * for it to compact with. Notice: If the data is deleted and then start an 
inner space
+   * compaction, it may cause unseqStartTime > seqEndTime or partial devices 
are in the unseq files
+   * but not in seq files, which will cause the unseq file not overlap with 
any seq files
+   */
+  private boolean tryToSelectLatestSealedSeqFile(TsFileResource unseqFile) {
+    logger.info("Unseq file {} does not overlap with seq files.", unseqFile);
+    List<TsFileResource> seqResources = resource.getSeqFiles();
+    for (int i = seqResources.size() - 1; i >= 0; i--) {
+      if (seqResources.get(i).isClosed()) {
+        logger.info("Select the latest closed seq file {} for it to compact.", 
seqResources.get(i));
+        tmpSelectedSeqFiles.add(i);
+        return true;
+      }
+    }
+    return false;
+  }
+
   private boolean updateSelectedFiles(long newCost, TsFileResource unseqFile) {
     if (selectedUnseqFiles.size() == 0
         || (seqSelectedNum + selectedUnseqFiles.size() + 1 + 
tmpSelectedSeqFiles.size()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index ee4a45d9db..2071e21ba1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -113,7 +113,7 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
    * @return return whether to continue the search to higher levels
    * @throws IOException
    */
-  private boolean selectLevelTask(
+  public boolean selectLevelTask(
       int level, PriorityQueue<Pair<List<TsFileResource>, Long>> 
taskPriorityQueue)
       throws IOException {
     boolean shouldContinueToSearch = true;
@@ -180,7 +180,7 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
     return 
CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
   }
 
-  private class SizeTieredCompactionTaskComparator
+  public static class SizeTieredCompactionTaskComparator
       implements Comparator<Pair<List<TsFileResource>, Long>> {
 
     @Override
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index 97084de2b6..1f47b67e9c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.engine.compaction;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -35,8 +37,12 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
@@ -45,15 +51,21 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+import static org.junit.Assert.fail;
 
 public class AbstractCompactionTest {
   protected int seqFileNum = 5;
   protected int unseqFileNum = 0;
   protected List<TsFileResource> seqResources = new ArrayList<>();
   protected List<TsFileResource> unseqResources = new ArrayList<>();
+
+  protected TsFileManager tsFileManager =
+      new TsFileManager(TsFileGeneratorUtils.testStorageGroup, "0", 
STORAGE_GROUP_DIR.getPath());
   private int chunkGroupSize = 0;
   private int pageSize = 0;
   protected String COMPACTION_TEST_SG = TsFileGeneratorUtils.testStorageGroup;
@@ -341,6 +353,70 @@ public class AbstractCompactionTest {
     }
   }
 
+  protected Map<PartialPath, List<TimeValuePair>> readSourceFiles(
+      List<PartialPath> timeseriesPaths, List<TSDataType> dataTypes) throws 
IOException {
+    Map<PartialPath, List<TimeValuePair>> sourceData = new LinkedHashMap<>();
+    for (PartialPath path : timeseriesPaths) {
+      List<TimeValuePair> dataList = new ArrayList<>();
+      sourceData.put(path, dataList);
+      IBatchReader tsFilesReader =
+          new SeriesRawDataBatchReader(
+              path,
+              path.getSeriesType(),
+              EnvironmentUtils.TEST_QUERY_CONTEXT,
+              tsFileManager.getTsFileList(true),
+              tsFileManager.getTsFileList(false),
+              null,
+              null,
+              true);
+      while (tsFilesReader.hasNextBatch()) {
+        BatchData batchData = tsFilesReader.nextBatch();
+        while (batchData.hasCurrent()) {
+          dataList.add(
+              new TimeValuePair(
+                  batchData.currentTime(),
+                  TsPrimitiveType.getByType(path.getSeriesType(), 
batchData.currentValue())));
+          batchData.next();
+        }
+      }
+    }
+    return sourceData;
+  }
+
+  protected void validateTargetDatas(
+      Map<PartialPath, List<TimeValuePair>> sourceDatas, List<TSDataType> 
dataTypes)
+      throws IOException {
+    for (Map.Entry<PartialPath, List<TimeValuePair>> entry : 
sourceDatas.entrySet()) {
+      IBatchReader tsFilesReader =
+          new SeriesRawDataBatchReader(
+              entry.getKey(),
+              entry.getKey().getSeriesType(),
+              EnvironmentUtils.TEST_QUERY_CONTEXT,
+              tsFileManager.getTsFileList(true),
+              tsFileManager.getTsFileList(false),
+              null,
+              null,
+              true);
+      List<TimeValuePair> timeseriesData = entry.getValue();
+      while (tsFilesReader.hasNextBatch()) {
+        BatchData batchData = tsFilesReader.nextBatch();
+        while (batchData.hasCurrent()) {
+          TimeValuePair data = timeseriesData.remove(0);
+          Assert.assertEquals(data.getTimestamp(), batchData.currentTime());
+          Assert.assertEquals(
+              data.getValue(),
+              TsPrimitiveType.getByType(entry.getKey().getSeriesType(), 
batchData.currentValue()));
+          batchData.next();
+        }
+      }
+      if (timeseriesData.size() > 0) {
+        // there are still data points left, which are not in the target file. 
Lost the data after
+        // compaction.
+        fail();
+      }
+    }
+  }
+
   protected void deleteTimeseriesInMManager(List<String> timeseries) throws 
MetadataException {
     for (String path : timeseries) {
       IoTDB.metaManager.deleteTimeseries(new PartialPath(path));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
index 109d49149e..1bf87595c5 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
@@ -26,16 +26,24 @@ import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceComp
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceMergeFileSelector;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.RewriteCrossSpaceCompactionTask;
-import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import 
org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
+import 
org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionSelector;
+import 
org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import 
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -45,12 +53,16 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
 public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest 
{
-  TsFileManager tsFileManager =
-      new TsFileManager(COMPACTION_TEST_SG, "0", STORAGE_GROUP_DIR.getPath());
 
   private final String oldThreadName = Thread.currentThread().getName();
 
@@ -2102,4 +2114,227 @@ public class CrossSpaceCompactionValidationTest extends 
AbstractCompactionTest {
     Assert.assertEquals(0, TsFileValidationTool.badFileNum);
     TsFileValidationTool.clearMap();
   }
+
+  @Test
+  public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles1() throws 
Exception {
+    
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+    createFiles(5, 10, 5, 1000, 0, 0, 100, 100, false, true);
+    createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+    createFiles(3, 10, 5, 1000, 7500, 7500, 100, 100, false, true);
+
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // delete d0 ~ d5 in seq files
+    Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+    for (int d = 0; d < 5; d++) {
+      for (int m = 0; m < 5; m++) {
+        deleteMap.put(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR + 
"s" + m,
+            new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+      }
+    }
+    for (TsFileResource resource : seqResources) {
+      CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+    }
+
+    List<PartialPath> timeseriesPaths = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        timeseriesPaths.add(
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR 
+ "s" + j,
+                TSDataType.INT64));
+      }
+    }
+    Map<PartialPath, List<TimeValuePair>> sourceData =
+        readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+    // inner seq space compact
+    SizeTieredCompactionSelector sizeTieredCompactionSelector =
+        new SizeTieredCompactionSelector(
+            COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new 
InnerSpaceCompactionTaskFactory());
+
+    PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+        new PriorityQueue<>(new 
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+    sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+    ;
+    for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+      new SizeTieredCompactionTask(
+              COMPACTION_TEST_SG,
+              "0",
+              0,
+              tsFileManager,
+              taskResource.left,
+              true,
+              new AtomicInteger(0))
+          .call();
+    }
+
+    // select cross compaction
+    CrossSpaceCompactionResource crossSpaceCompactionResource =
+        new CrossSpaceCompactionResource(
+            tsFileManager.getTsFileList(true), 
tsFileManager.getTsFileList(false));
+    RewriteCompactionFileSelector crossSpaceCompactionSelector =
+        new RewriteCompactionFileSelector(crossSpaceCompactionResource, 
Long.MAX_VALUE);
+    List[] pairs = crossSpaceCompactionSelector.select();
+    Assert.assertEquals(2, pairs.length);
+    Assert.assertEquals(1, pairs[0].size());
+    Assert.assertEquals(2, pairs[1].size());
+
+    new RewriteCrossSpaceCompactionTask(
+            "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new 
AtomicInteger(0), 0)
+        .call();
+
+    validateSeqFiles();
+    validateTargetDatas(sourceData, Collections.emptyList());
+  }
+
+  @Test
+  public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles2() throws 
Exception {
+    
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+    createFiles(5, 10, 5, 1000, 0, 0, 100, 100, false, true);
+    createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // delete d0 ~ d5 in seq files
+    Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+    for (int d = 0; d < 5; d++) {
+      for (int m = 0; m < 5; m++) {
+        deleteMap.put(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR + 
"s" + m,
+            new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+      }
+    }
+    for (TsFileResource resource : seqResources) {
+      CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+    }
+
+    List<PartialPath> timeseriesPaths = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        timeseriesPaths.add(
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR 
+ "s" + j,
+                TSDataType.INT64));
+      }
+    }
+    Map<PartialPath, List<TimeValuePair>> sourceData =
+        readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+    // inner seq space compact
+    SizeTieredCompactionSelector sizeTieredCompactionSelector =
+        new SizeTieredCompactionSelector(
+            COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new 
InnerSpaceCompactionTaskFactory());
+
+    PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+        new PriorityQueue<>(new 
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+    sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+    ;
+    for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+      new SizeTieredCompactionTask(
+              COMPACTION_TEST_SG,
+              "0",
+              0,
+              tsFileManager,
+              taskResource.left,
+              true,
+              new AtomicInteger(0))
+          .call();
+    }
+
+    // select cross compaction
+    CrossSpaceCompactionResource crossSpaceCompactionResource =
+        new CrossSpaceCompactionResource(
+            tsFileManager.getTsFileList(true), 
tsFileManager.getTsFileList(false));
+    RewriteCompactionFileSelector crossSpaceCompactionSelector =
+        new RewriteCompactionFileSelector(crossSpaceCompactionResource, 
Long.MAX_VALUE);
+    List[] pairs = crossSpaceCompactionSelector.select();
+    Assert.assertEquals(2, pairs.length);
+    Assert.assertEquals(1, pairs[0].size());
+    Assert.assertEquals(2, pairs[1].size());
+
+    new RewriteCrossSpaceCompactionTask(
+            "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new 
AtomicInteger(0), 0)
+        .call();
+
+    validateSeqFiles();
+    validateTargetDatas(sourceData, Collections.emptyList());
+  }
+
+  @Test
+  public void testNonAlignedUnseqFilesNotOverlapWithSeqFiles3() throws 
Exception {
+    
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+    createFiles(4, 10, 5, 1000, 0, 0, 100, 100, false, true);
+    createFiles(2, 5, 10, 500, 6000, 6000, 0, 100, false, false);
+    createFiles(1, 10, 5, 1000, 7500, 7500, 100, 100, false, true);
+
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // delete d0 ~ d5 in seq files
+    Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+    for (int d = 0; d < 5; d++) {
+      for (int m = 0; m < 5; m++) {
+        deleteMap.put(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + d + PATH_SEPARATOR + 
"s" + m,
+            new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
+      }
+    }
+    for (TsFileResource resource : seqResources) {
+      CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+    }
+
+    List<PartialPath> timeseriesPaths = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        timeseriesPaths.add(
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR 
+ "s" + j,
+                TSDataType.INT64));
+      }
+    }
+    Map<PartialPath, List<TimeValuePair>> sourceData =
+        readSourceFiles(timeseriesPaths, Collections.emptyList());
+
+    // inner seq space compact
+    SizeTieredCompactionSelector sizeTieredCompactionSelector =
+        new SizeTieredCompactionSelector(
+            COMPACTION_TEST_SG, "0", 0, tsFileManager, true, new 
InnerSpaceCompactionTaskFactory());
+
+    PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+        new PriorityQueue<>(new 
SizeTieredCompactionSelector.SizeTieredCompactionTaskComparator());
+    sizeTieredCompactionSelector.selectLevelTask(0, taskPriorityQueue);
+    for (Pair<List<TsFileResource>, Long> taskResource : taskPriorityQueue) {
+      new SizeTieredCompactionTask(
+              COMPACTION_TEST_SG,
+              "0",
+              0,
+              tsFileManager,
+              taskResource.left,
+              true,
+              new AtomicInteger(0))
+          .call();
+    }
+
+    // select cross compaction
+    CrossSpaceCompactionResource crossSpaceCompactionResource =
+        new CrossSpaceCompactionResource(
+            tsFileManager.getTsFileList(true), 
tsFileManager.getTsFileList(false));
+    RewriteCompactionFileSelector crossSpaceCompactionSelector =
+        new RewriteCompactionFileSelector(crossSpaceCompactionResource, 
Long.MAX_VALUE);
+    List[] pairs = crossSpaceCompactionSelector.select();
+    Assert.assertEquals(2, pairs.length);
+    Assert.assertEquals(1, pairs[0].size());
+    Assert.assertEquals(2, pairs[1].size());
+
+    new RewriteCrossSpaceCompactionTask(
+            "0", COMPACTION_TEST_SG, 0, tsFileManager, pairs[0], pairs[1], new 
AtomicInteger(0), 0)
+        .call();
+
+    validateSeqFiles();
+    validateTargetDatas(sourceData, Collections.emptyList());
+  }
 }

Reply via email to