This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 75d7d4c  [To rel/0.11] Cherry pick 
add_continuous_compaction_in_level_compaction_strategy (#3187)
75d7d4c is described below

commit 75d7d4c3d33e321b1781ffcf736ece0e9d66fb27
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri May 14 14:07:16 2021 +0800

    [To rel/0.11] Cherry pick 
add_continuous_compaction_in_level_compaction_strategy (#3187)
---
 .../resources/conf/iotdb-engine.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  21 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  21 +-
 .../db/engine/compaction/TsFileManagement.java     |   9 +-
 .../level/LevelCompactionTsFileManagement.java     |  56 ++---
 .../engine/storagegroup/StorageGroupProcessor.java |  18 +-
 .../compaction/LevelCompactionCacheTest.java       |   3 +-
 .../engine/compaction/LevelCompactionLogTest.java  |   7 +-
 .../compaction/LevelCompactionMergeTest.java       |  74 ++++++-
 .../compaction/LevelCompactionMoreDataTest.java    | 238 +++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  |   3 +-
 .../NoCompactionTsFileManagementTest.java          |   7 +-
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |   6 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   6 +-
 14 files changed, 416 insertions(+), 57 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2bae439..26dc1ed 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -300,6 +300,10 @@ compaction_strategy=LEVEL_COMPACTION
 enable_unseq_compaction = true
 
 # Works when the compaction_strategy is LEVEL_COMPACTION.
+# Whether to start next compaction task automatically after finish one 
compaction task
+# enable_continuous_compaction=true
+
+# Works when the compaction_strategy is LEVEL_COMPACTION.
 # The max seq file num of each level.
 # When the num of files in one level exceeds this,
 # the files in this level will merge to one and put to upper level.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4a6fec5..1f706f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,10 +318,15 @@ public class IoTDBConfig {
   private boolean enableUnseqCompaction = true;
 
   /**
-   * Works when the compaction_strategy is LEVEL_COMPACTION.
-   * The max seq file num of each level.
-   * When the num of files in one level exceeds this,
-   * the files in this level will merge to one and put to upper level.
+   * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to start 
next compaction task
+   * automatically after finish one compaction task
+   */
+  private boolean enableContinuousCompaction = true;
+
+  /**
+   * Works when the compaction_strategy is LEVEL_COMPACTION. The max seq file 
num of each level.
+   * When the num of files in one level exceeds this, the files in this level 
will merge to one and
+   * put to upper level.
    */
   private int seqFileNumInEachLevel = 6;
 
@@ -1479,6 +1484,14 @@ public class IoTDBConfig {
     this.enableUnseqCompaction = enableUnseqCompaction;
   }
 
+  public boolean isEnableContinuousCompaction() {
+    return enableContinuousCompaction;
+  }
+
+  public void setEnableContinuousCompaction(boolean 
enableContinuousCompaction) {
+    this.enableContinuousCompaction = enableContinuousCompaction;
+  }
+
   public int getSeqFileNumInEachLevel() {
     return seqFileNumInEachLevel;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0aa0c19..98f5ab0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -279,9 +279,17 @@ public class IoTDBDescriptor {
         conf.setMemtableSizeThreshold(memTableSizeThreshold);
       }
 
-      conf.setAvgSeriesPointNumberThreshold(Integer.parseInt(properties
-          .getProperty("avg_series_point_number_threshold",
-              Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+      conf.setAvgSeriesPointNumberThreshold(
+          Integer.parseInt(
+              properties.getProperty(
+                  "avg_series_point_number_threshold",
+                  Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+
+      conf.setEnableContinuousCompaction(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_continuous_compaction",
+                  Boolean.toString(conf.isEnableContinuousCompaction()))));
 
       conf.setCheckPeriodWhenInsertBlocked(Integer.parseInt(properties
           .getProperty("check_period_when_insert_blocked",
@@ -760,6 +768,13 @@ public class IoTDBDescriptor {
       // update debug_state
       
conf.setDebugState(Boolean.parseBoolean(properties.getProperty("debug_state")));
 
+      // update enable_continuous_compaction
+      conf.setEnableContinuousCompaction(
+          
Boolean.parseBoolean(properties.getProperty("enable_continuous_compaction")));
+
+      // update merge_write_throughput_mb_per_sec
+      conf.setMergeWriteThroughputMbPerSec(
+          
Integer.parseInt(properties.getProperty("merge_write_throughput_mb_per_sec")));
     } catch (Exception e) {
       throw new QueryProcessException(
           String.format("Fail to reload configuration because %s", e));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 226f686..0cf1e74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -68,6 +68,9 @@ public abstract class TsFileManagement {
 
   private long mergeStartTime;
 
+  /** whether execute merge chunk in this task */
+  protected boolean isMergeExecutedInCurrentTask = false;
+
   protected boolean isForceFullMerge = 
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
 
   public TsFileManagement(String storageGroupName, String storageGroupDir) {
@@ -161,7 +164,7 @@ public abstract class TsFileManagement {
     @Override
     public void run() {
       merge(timePartitionId);
-      closeCompactionMergeCallBack.call();
+      closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, 
timePartitionId);
     }
   }
 
@@ -176,7 +179,9 @@ public abstract class TsFileManagement {
     @Override
     public void run() {
       recover();
-      closeCompactionMergeCallBack.call();
+      // in recover logic, we do not have to start next compaction task, and 
in this case the param
+      // time partition is useless, we can just pass 0L
+      closeCompactionMergeCallBack.call(false, 0L);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 61ba9b4..7a41518 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -533,25 +533,21 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       forkTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
-          seqLevelNum,
-          seqFileNumInEachLevel);
+          seqLevelNum);
     }
     // we have to copy all unseq file
     synchronized (unSequenceTsFileResources) {
+      // we have to copy all unseq file
       forkTsFileList(
           forkedUnSequenceTsFileResources,
           unSequenceTsFileResources.computeIfAbsent(
               timePartition, this::newUnSequenceTsFileResources),
-          unseqLevelNum + 1,
-          unseqFileNumInEachLevel);
+          unseqLevelNum + 1);
     }
   }
 
   private void forkTsFileList(
-      List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources,
-      int currMaxLevel,
-      int currFileNumInEachLevel) {
+      List<List<TsFileResource>> forkedTsFileResources, List 
rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
     for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
@@ -560,9 +556,6 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       for (TsFileResource tsFileResource : levelRawTsFileResources) {
         if (tsFileResource.isClosed()) {
           forkedLevelTsFileResources.add(tsFileResource);
-          if (forkedLevelTsFileResources.size() > currFileNumInEachLevel) {
-            break;
-          }
         }
       }
       forkedTsFileResources.add(forkedLevelTsFileResources);
@@ -571,25 +564,31 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum, 
seqFileNumInEachLevel);
-    if (enableUnseqCompaction && unseqLevelNum <= 1 && 
forkedUnSequenceTsFileResources.size() > 0) {
+    isMergeExecutedInCurrentTask =
+        merge(
+            forkedSequenceTsFileResources, true, timePartition, seqLevelNum, 
seqFileNumInEachLevel);
+    if (enableUnseqCompaction
+        && unseqLevelNum <= 1
+        && forkedUnSequenceTsFileResources.get(0).size() > 0) {
+      isMergeExecutedInCurrentTask = true;
       merge(
           isForceFullMerge,
-          getTsFileList(true),
+          getTsFileListByTimePartition(true, timePartition),
           forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     } else {
-      merge(
-          forkedUnSequenceTsFileResources,
-          false,
-          timePartition,
-          unseqLevelNum,
-          unseqFileNumInEachLevel);
+      isMergeExecutedInCurrentTask =
+          merge(
+              forkedUnSequenceTsFileResources,
+              false,
+              timePartition,
+              unseqLevelNum,
+              unseqFileNumInEachLevel);
     }
   }
 
   @SuppressWarnings("squid:S3776")
-  private void merge(
+  private boolean merge(
       List<List<TsFileResource>> mergeResources,
       boolean sequence,
       long timePartition,
@@ -602,21 +601,26 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       } catch (InterruptedException e) {
         logger.error("{} [Compaction] shutdown", storageGroupName, e);
         Thread.currentThread().interrupt();
-        return;
+        return false;
       }
     }
     isSeqMerging = true;
     long startTimeMillis = System.currentTimeMillis();
+    // whether execute merge chunk in the loop below
+    boolean isMergeExecutedInCurrentTask = false;
     try {
       logger.info("{} start to filter compaction condition", storageGroupName);
       for (int i = 0; i < currMaxLevel - 1; i++) {
-        if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+        List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
+        if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
+          // just merge part of the file
+          isMergeExecutedInCurrentTask = true;
           // level is numbered from 0
           if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
             // do not merge current unseq file level to upper level and just 
merge all of them to
             // seq file
             isSeqMerging = false;
-            merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
+            merge(isForceFullMerge, getTsFileListByTimePartition(true, 
timePartition), mergeResources.get(i), Long.MAX_VALUE);
           } else {
             CompactionLogger compactionLogger =
                 new CompactionLogger(storageGroupDir, storageGroupName);
@@ -628,7 +632,8 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
                 createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), 
i + 1);
             compactionLogger.logSequence(sequence);
             compactionLogger.logFile(TARGET_NAME, newLevelFile);
-            List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+            List<TsFileResource> toMergeTsFiles =
+                mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
             logger.info(
                 "{} [Compaction] merge level-{}'s {} TsFiles to next level",
                 storageGroupName,
@@ -692,6 +697,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           sequence,
           System.currentTimeMillis() - startTimeMillis);
     }
+    return isMergeExecutedInCurrentTask;
   }
 
   /** if level < maxLevel-1, the file need compaction else, the file can be 
merged later */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 4fd2d0f..1440a51 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -354,7 +354,8 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
-    for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+    List<TsFileResource> seqTsFileResources = 
tsFileManagement.getTsFileList(true);
+    for (TsFileResource resource : seqTsFileResources) {
       long timePartitionId = resource.getTimePartition();
       Map<String, Long> endTimeMap = new HashMap<>();
       for (Entry<String, Integer> entry : 
resource.getDeviceToIndexMap().entrySet()) {
@@ -382,7 +383,7 @@ public class StorageGroupProcessor {
             .submitTask(
                 tsFileManagement.new 
CompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack();
+        this.closeCompactionMergeCallBack(false, 0);
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
@@ -1872,7 +1873,7 @@ public class StorageGroupProcessor {
                 tsFileManagement
                 .new CompactionMergeTask(this::closeCompactionMergeCallBack, 
timePartition));
       } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack();
+        this.closeCompactionMergeCallBack(false, timePartition);
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
@@ -1881,8 +1882,13 @@ public class StorageGroupProcessor {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
-    this.compactionMergeWorking = false;
+  private void closeCompactionMergeCallBack(boolean isMerge, long 
timePartitionId) {
+    if (isMerge && 
IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      executeCompaction(
+          timePartitionId, 
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    } else {
+      this.compactionMergeWorking = false;
+    }
   }
 
   /**
@@ -2737,7 +2743,7 @@ public class StorageGroupProcessor {
   @FunctionalInterface
   public interface CloseCompactionMergeCallBack {
 
-    void call();
+    void call(boolean isMergeExecutedInCurrentTask, long timePartitionId);
   }
 
   @FunctionalInterface
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 4169816..6c03e4f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -119,7 +119,8 @@ public class LevelCompactionCacheTest extends 
LevelCompactionTest {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index a6d1647..8bb4e9c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -73,10 +73,9 @@ public class LevelCompactionLogTest extends 
LevelCompactionTest {
     assertFalse(logFile.exists());
   }
 
-  /**
-   * close compaction merge callback, to release some locks
-   */
-  private void closeCompactionMergeCallBack() {
+  /** close compaction merge callback, to release some locks */
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 8970a4c..4a6632c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -24,11 +24,16 @@ import static org.junit.Assert.assertEquals;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import 
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
 import 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -133,10 +138,71 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
     IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
   }
 
-  /**
-   * close compaction merge callback, to release some locks
-   */
-  private void closeCompactionMergeCallBack() {
+  @Test
+  public void testCompactionModsByOffsetAfterMerge() throws 
IllegalPathException, IOException {
+    int prevPageLimit =
+        
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
+
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    TsFileResource forthSeqTsFileResource = seqResources.get(3);
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[0].getMeasurementId());
+    try (ModificationFile sourceModificationFile =
+        new ModificationFile(
+            forthSeqTsFileResource.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
+      Modification modification =
+          new Deletion(path, forthSeqTsFileResource.getTsFileSize() / 10, 300, 
310);
+      sourceModificationFile.write(modification);
+    }
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionMergeTask =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionMergeTask.run();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    QueryContext context = new QueryContext();
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[0].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+
+    long count = 0L;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        System.out.println(batchData.getTimeByIndex(i));
+      }
+      count += batchData.length();
+    }
+    assertEquals(489, count);
+
+    List<TsFileResource> tsFileResourceList = 
levelCompactionTsFileManagement.getTsFileList(true);
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      tsFileResource.getModFile().remove();
+      tsFileResource.remove();
+    }
+    
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
+  }
+
+  /** close compaction merge callback, to release some locks */
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
new file mode 100644
index 0000000..565741a
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.constant.TestConstant;
+import 
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
+import 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+public class LevelCompactionMoreDataTest extends LevelCompactionTest {
+
+  protected int measurementNum = 3000;
+
+  File tempSGDir;
+  boolean compactionMergeWorking = false;
+
+  @Override
+  protected void prepareSeries() throws MetadataException {
+    measurementSchemas = new MeasurementSchema[measurementNum];
+    for (int i = 0; i < measurementNum; i++) {
+      measurementSchemas[i] =
+          new MeasurementSchema(
+              "sensor" + i, TSDataType.DOUBLE, encoding, 
CompressionType.UNCOMPRESSED);
+    }
+    deviceIds = new String[deviceNum];
+    for (int i = 0; i < deviceNum; i++) {
+      deviceIds[i] = COMPACTION_TEST_SG + PATH_SEPARATOR + "device" + i;
+    }
+    IoTDB.metaManager.setStorageGroup(new PartialPath(COMPACTION_TEST_SG));
+    for (String device : deviceIds) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        PartialPath devicePath = new PartialPath(device);
+        IoTDB.metaManager.createTimeseries(
+            devicePath.concatNode(measurementSchema.getMeasurementId()),
+            measurementSchema.getType(),
+            measurementSchema.getEncodingType(),
+            measurementSchema.getCompressor(),
+            Collections.emptyMap());
+      }
+    }
+  }
+
+  @Override
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, 
WriteProcessException {
+    for (int i = 0; i < seqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.BASE_OUTPUT_PATH.concat(
+                  i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setClosed(true);
+      tsFileResource.updatePlanIndexes((long) i);
+      seqResources.add(tsFileResource);
+      prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+    }
+    for (int i = 0; i < unseqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.BASE_OUTPUT_PATH.concat(
+                  (10000 + i)
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + (10000 + i)
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setClosed(true);
+      tsFileResource.updatePlanIndexes((long) (i + seqFileNum));
+      unseqResources.add(tsFileResource);
+      prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 
10000);
+    }
+
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                unseqFileNum
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + unseqFileNum
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    tsFileResource.updatePlanIndexes((long) (seqFileNum + unseqFileNum));
+    unseqResources.add(tsFileResource);
+    prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
+  }
+
+  @Override
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, 
long valueOffset)
+      throws IOException, WriteProcessException {
+    TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
+    for (String deviceId : deviceIds) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        fileWriter.registerTimeseries(
+            new Path(deviceId, measurementSchema.getMeasurementId()), 
measurementSchema);
+      }
+    }
+    for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+      for (int j = 0; j < deviceNum; j++) {
+        TSRecord record = new TSRecord(i, deviceIds[j]);
+        for (int k = 0; k < measurementNum; k++) {
+          record.addTuple(
+              DataPoint.getDataPoint(
+                  measurementSchemas[k].getType(),
+                  measurementSchemas[k].getMeasurementId(),
+                  String.valueOf(i + valueOffset + k)));
+        }
+        fileWriter.write(record);
+        tsFileResource.updateStartTime(deviceIds[j], i);
+        tsFileResource.updateEndTime(deviceIds[j], i);
+      }
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+  }
+
+  @Before
+  public void setUp() throws IOException, WriteProcessException, 
MetadataException {
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  // test file compaction larger than 1024 sensor
+  @Test
+  public void testSensorWithTwoOrThreeNode() throws IllegalPathException, 
IOException {
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionMergeTask =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionMergeTask.run();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    QueryContext context = new QueryContext();
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[2688].getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[2688].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        assertEquals(batchData.getTimeByIndex(i) + 2688, 
batchData.getDoubleByIndex(i), 0.001);
+      }
+    }
+  }
+
+  /** close compaction merge callback, to release some locks */
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
+    this.compactionMergeWorking = false;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 7fbc509..dc71ea4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -97,7 +96,7 @@ abstract class LevelCompactionTest {
     EnvironmentUtils.cleanAllDir();
   }
 
-  private void prepareSeries() throws MetadataException {
+  protected void prepareSeries() throws MetadataException {
     measurementSchemas = new MeasurementSchema[measurementNum];
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas[i] = new MeasurementSchema("sensor" + i, 
TSDataType.DOUBLE,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index f99ec45..e3f21f3 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -113,9 +113,10 @@ public class NoCompactionTsFileManagementTest extends 
LevelCompactionTest {
                 + ".tsfile"))), false);
     noCompactionTsFileManagement.forkCurrentFileList(0);
     noCompactionTsFileManagement.recover();
-    CompactionMergeTask compactionMergeTask = noCompactionTsFileManagement.new 
CompactionMergeTask(
-        () -> {
-        }, 0);
+    CompactionMergeTask compactionMergeTask =
+        noCompactionTsFileManagement
+        .new CompactionMergeTask(
+            (boolean isMergeExecutedInCurrentTask, long timePartitionId) -> 
{}, 0);
     compactionMergeTask.run();
     assertEquals(1, noCompactionTsFileManagement.size(true));
     assertEquals(1, noCompactionTsFileManagement.size(false));
diff --git a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java 
b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
index 604a305..90142a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
@@ -21,7 +21,11 @@ package org.apache.iotdb.db.script;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7ad89b2..5ac96f4 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -135,8 +135,10 @@ public class TsFileIOWriter {
   }
 
   protected void startFile() throws IOException {
-    out.write(magicStringBytes);
-    out.write(versionNumberBytes);
+    if (out != null) {
+      out.write(magicStringBytes);
+      out.write(versionNumberBytes);
+    }
   }
 
   public void startChunkGroup(String deviceId) throws IOException {

Reply via email to