This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76a4e43955 [IOTDB-3018] Fix compation bugs on handling deleted target 
file and service shutdown deadlock (#5693)
76a4e43955 is described below

commit 76a4e43955658be78bfa2b8807c527820a6b545d
Author: BaiJian <[email protected]>
AuthorDate: Fri Apr 29 19:33:40 2022 +0800

    [IOTDB-3018] Fix compation bugs on handling deleted target file and service 
shutdown deadlock (#5693)
---
 .../engine/compaction/CompactionTaskManager.java   | 45 ++++++-----
 .../compaction/cross/CrossSpaceCompactionTask.java |  2 +-
 .../compaction/inner/InnerSpaceCompactionTask.java | 34 +++++----
 .../impl/ReadPointCompactionPerformer.java         |  5 +-
 .../compaction/task/AbstractCompactionTask.java    | 19 ++---
 .../compaction/task/CompactionTaskSummary.java     | 32 ++++++++
 .../compaction/CompactionTaskManagerTest.java      |  2 +-
 .../inner/InnerCompactionEmptyTsFileTest.java      | 89 ++++++++++++++++++++++
 .../compaction/inner/InnerCompactionTest.java      | 63 +--------------
 9 files changed, 178 insertions(+), 113 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 2c159d8331..df314abffa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.comparator.DefaultCompactionTaskComparatorImpl;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
 import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -69,8 +70,8 @@ public class CompactionTaskManager implements IService {
       new FixedPriorityBlockingQueue<>(1024, new 
DefaultCompactionTaskComparatorImpl());
   // <fullStorageGroupName,futureSet>, it is used to store all compaction 
tasks under each
   // virtualStorageGroup
-  private Map<String, Map<AbstractCompactionTask, Future<Void>>> 
storageGroupTasks =
-      new HashMap<>();
+  private Map<String, Map<AbstractCompactionTask, 
Future<CompactionTaskSummary>>>
+      storageGroupTasks = new HashMap<>();
 
   // The thread pool that periodically fetches and executes the compaction 
task from
   // candidateCompactionTaskQueue to taskExecutionPool. The default number of 
threads for this pool
@@ -124,7 +125,7 @@ public class CompactionTaskManager implements IService {
   }
 
   @Override
-  public synchronized void stop() {
+  public void stop() {
     if (taskExecutionPool != null) {
       taskExecutionPool.shutdownNow();
       compactionTaskSubmissionThreadPool.shutdownNow();
@@ -136,24 +137,24 @@ public class CompactionTaskManager implements IService {
   }
 
   @Override
-  public synchronized void waitAndStop(long milliseconds) {
+  public void waitAndStop(long milliseconds) {
     if (taskExecutionPool != null) {
       awaitTermination(taskExecutionPool, milliseconds);
       awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
-      logger.info("Waiting for task taskExecutionPool to shut down");
+      logger.info("Waiting for task taskExecutionPool to shut down in {} ms", 
milliseconds);
       waitTermination();
       storageGroupTasks.clear();
     }
   }
 
   @TestOnly
-  public synchronized void waitAllCompactionFinish() {
+  public void waitAllCompactionFinish() {
     long sleepingStartTime = 0;
     if (taskExecutionPool != null) {
       while (taskExecutionPool.getActiveCount() > 0 || 
taskExecutionPool.getQueue().size() > 0) {
         // wait
         try {
-          this.wait(200);
+          Thread.sleep(200);
           sleepingStartTime += 200;
           if (sleepingStartTime % 10000 == 0) {
             logger.warn(
@@ -168,16 +169,17 @@ public class CompactionTaskManager implements IService {
         }
       }
       storageGroupTasks.clear();
+      candidateCompactionTaskQueue.clear();
       logger.info("All compaction task finish");
     }
   }
 
-  private synchronized void waitTermination() {
+  private void waitTermination() {
     long startTime = System.currentTimeMillis();
     while (!taskExecutionPool.isTerminated()) {
       int timeMillis = 0;
       try {
-        this.wait(200);
+        Thread.sleep(200);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -192,7 +194,7 @@ public class CompactionTaskManager implements IService {
     logger.info("CompactionManager stopped");
   }
 
-  private synchronized void awaitTermination(ExecutorService service, long 
milliseconds) {
+  private void awaitTermination(ExecutorService service, long milliseconds) {
     try {
       service.shutdown();
       service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS);
@@ -301,28 +303,28 @@ public class CompactionTaskManager implements IService {
   /**
    * This method will directly submit a task to thread pool if there is 
available thread.
    *
-   * @throws RejectedExecutionException
+   * @return the future of the task.
    */
-  public synchronized void submitTask(AbstractCompactionTask compactionTask)
-      throws RejectedExecutionException {
-    if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
-      Future<Void> future = taskExecutionPool.submit(compactionTask);
+  public synchronized Future<CompactionTaskSummary> submitTask(
+      AbstractCompactionTask compactionTask) throws RejectedExecutionException 
{
+    if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) {
+      Future<CompactionTaskSummary> future = 
taskExecutionPool.submit(compactionTask);
       storageGroupTasks
           .computeIfAbsent(compactionTask.getFullStorageGroupName(), x -> new 
HashMap<>())
           .put(compactionTask, future);
-      return;
+      return future;
     }
     logger.warn(
         "A CompactionTask failed to be submitted to CompactionTaskManager 
because {}",
         taskExecutionPool == null
             ? "taskExecutionPool is null"
             : "taskExecutionPool is terminated");
+    return null;
   }
 
   public synchronized Future<Void> submitSubTask(Callable<Void> 
subCompactionTask) {
-    if (subCompactionTaskExecutionPool != null && 
!subCompactionTaskExecutionPool.isTerminated()) {
-      Future<Void> future = 
subCompactionTaskExecutionPool.submit(subCompactionTask);
-      return future;
+    if (subCompactionTaskExecutionPool != null && 
!subCompactionTaskExecutionPool.isShutdown()) {
+      return subCompactionTaskExecutionPool.submit(subCompactionTask);
     }
     return null;
   }
@@ -336,7 +338,7 @@ public class CompactionTaskManager implements IService {
   public synchronized List<AbstractCompactionTask> abortCompaction(String 
storageGroupName) {
     List<AbstractCompactionTask> compactionTaskOfCurSG = new ArrayList<>();
     if (storageGroupTasks.containsKey(storageGroupName)) {
-      for (Map.Entry<AbstractCompactionTask, Future<Void>> taskFutureEntry :
+      for (Map.Entry<AbstractCompactionTask, Future<CompactionTaskSummary>> 
taskFutureEntry :
           storageGroupTasks.get(storageGroupName).entrySet()) {
         taskFutureEntry.getValue().cancel(true);
         compactionTaskOfCurSG.add(taskFutureEntry.getKey());
@@ -367,7 +369,8 @@ public class CompactionTaskManager implements IService {
 
   public synchronized List<AbstractCompactionTask> 
getRunningCompactionTaskList() {
     List<AbstractCompactionTask> tasks = new ArrayList<>();
-    for (Map<AbstractCompactionTask, Future<Void>> taskFutureMap : 
storageGroupTasks.values()) {
+    for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> 
taskFutureMap :
+        storageGroupTasks.values()) {
       tasks.addAll(taskFutureMap.keySet());
     }
     return tasks;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 8f45039fb4..3f3568ea82 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -149,7 +149,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       }
     } catch (Throwable throwable) {
       // catch throwable instead of exception to handle OOM errors
-      LOGGER.error("Meet errors in cross space compaction, {}", 
throwable.getMessage());
+      LOGGER.error("Meet errors in cross space compaction.");
       CompactionExceptionHandler.handleException(
           fullStorageGroupName,
           logFile,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index a7b5acaaa3..f2f9092084 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -99,6 +101,8 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     targetTsFileResource =
         TsFileNameGenerator.getInnerCompactionTargetFileResource(
             selectedTsFileResourceList, sequence);
+    List<TsFileResource> targetTsFileList =
+        new ArrayList<>(Collections.singletonList(targetTsFileResource));
     LOGGER.info(
         "{} [Compaction] starting compaction task with {} files",
         fullStorageGroupName,
@@ -113,8 +117,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     try {
       compactionLogger = new CompactionLogger(logFile);
       compactionLogger.logFiles(selectedTsFileResourceList, 
CompactionLogger.STR_SOURCE_FILES);
-      compactionLogger.logFiles(
-          Collections.singletonList(targetTsFileResource), 
CompactionLogger.STR_TARGET_FILES);
+      compactionLogger.logFiles(targetTsFileList, 
CompactionLogger.STR_TARGET_FILES);
       LOGGER.info("{} [InnerSpaceCompactionTask] Close the logger", 
fullStorageGroupName);
       compactionLogger.close();
       LOGGER.info(
@@ -122,11 +125,12 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
 
       // carry out the compaction
       performer.setSourceFiles(selectedTsFileResourceList);
-      
performer.setTargetFiles(Collections.singletonList(targetTsFileResource));
+      // As elements in targetFiles may be removed in 
ReadPointCompactionPerformer, we should use a
+      // mutable list instead of Collections.singletonList()
+      performer.setTargetFiles(targetTsFileList);
       performer.perform();
 
-      CompactionUtils.moveTargetFile(
-          Collections.singletonList(targetTsFileResource), true, 
fullStorageGroupName);
+      CompactionUtils.moveTargetFile(targetTsFileList, true, 
fullStorageGroupName);
 
       LOGGER.info("{} [InnerSpaceCompactionTask] start to rename mods file", 
fullStorageGroupName);
       CompactionUtils.combineModsInInnerCompaction(
@@ -142,14 +146,14 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         tsFileManager.replace(
             selectedTsFileResourceList,
             Collections.emptyList(),
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             true);
       } else {
         tsFileManager.replace(
             Collections.emptyList(),
             selectedTsFileResourceList,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             timePartition,
             false);
       }
@@ -166,10 +170,11 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         isHoldingWriteLock[i] = true;
       }
 
-      if (targetTsFileResource.getTsFile().length()
-          < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
+      if (targetTsFileResource.getTsFile().exists()
+          && targetTsFileResource.getTsFile().length()
+              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) 
{
         // the file size is smaller than magic string and version number
-        throw new RuntimeException(
+        throw new TsFileNotCompleteException(
             String.format(
                 "target file %s is smaller than magic string and version 
number size",
                 targetTsFileResource));
@@ -194,10 +199,6 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         FileUtils.delete(logFile);
       }
     } catch (Throwable throwable) {
-      LOGGER.error(
-          "{} [Compaction] Throwable is caught during execution of 
SizeTieredCompaction, {}",
-          fullStorageGroupName,
-          throwable.getMessage());
       LOGGER.warn("{} [Compaction] Start to handle exception", 
fullStorageGroupName);
       if (throwable instanceof InterruptedException) {
         Thread.currentThread().interrupt();
@@ -209,7 +210,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             selectedTsFileResourceList,
             Collections.emptyList(),
             tsFileManager,
@@ -220,7 +221,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         CompactionExceptionHandler.handleException(
             fullStorageGroupName,
             logFile,
-            Collections.singletonList(targetTsFileResource),
+            targetTsFileList,
             Collections.emptyList(),
             selectedTsFileResourceList,
             tsFileManager,
@@ -228,6 +229,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
             true,
             isSequence());
       }
+      throw throwable;
     } finally {
       releaseFileLocksAndResetMergingStatus();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 43a121eff7..9d7c6789f4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -192,8 +192,7 @@ public class ReadPointCompactionPerformer
       AbstractCompactionWriter compactionWriter,
       QueryContext queryContext,
       QueryDataSource queryDataSource)
-      throws MetadataException, IOException, InterruptedException {
-    boolean hasStartChunkGroup = false;
+      throws IOException, InterruptedException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
         deviceIterator.iterateNotAlignedSeries(device, false);
     Set<String> allMeasurements = measurementIterator.getAllMeasurements();
@@ -204,7 +203,7 @@ public class ReadPointCompactionPerformer
     int idx = 0;
     for (String measurement : allMeasurements) {
       if (measurementsForEachSubTask[idx % subTaskNums] == null) {
-        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
+        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
       }
       measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 3fbf180eb2..0fbe2e7d40 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -32,11 +32,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * AbstractCompactionTask is the base class for all compaction task, it 
carries out the execution of
- * compaction. AbstractCompactionTask uses a template method, it execute the 
abstract function
- * <i>doCompaction</i> implemented by subclass, and decrease the 
currentTaskNum in
- * CompactionScheduler when the <i>doCompaction</i> finish.
+ * compaction. AbstractCompactionTask uses a template method, it executes the 
abstract function
+ * {@link AbstractCompactionTask#doCompaction()} implemented by subclass, and 
decrease the
+ * currentTaskNum in CompactionScheduler when the {@link 
AbstractCompactionTask#doCompaction()} is
+ * finished. The future returns the {@link CompactionTaskSummary} of this task 
execution.
  */
-public abstract class AbstractCompactionTask implements Callable<Void> {
+public abstract class AbstractCompactionTask implements 
Callable<CompactionTaskSummary> {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   protected String fullStorageGroupName;
@@ -64,25 +65,25 @@ public abstract class AbstractCompactionTask implements 
Callable<Void> {
   protected abstract void doCompaction() throws Exception;
 
   @Override
-  public Void call() throws Exception {
+  public CompactionTaskSummary call() throws Exception {
     ran = true;
     long startTime = System.currentTimeMillis();
     currentTaskNum.incrementAndGet();
+    boolean isSuccess = false;
     try {
       doCompaction();
+      isSuccess = true;
     } catch (InterruptedException e) {
       LOGGER.warn("Current task is interrupted");
-      Thread.interrupted();
     } catch (Exception e) {
-      LOGGER.error(e.getMessage(), e);
+      LOGGER.error("Running compaction task failed", e);
     } finally {
       this.currentTaskNum.decrementAndGet();
       CompactionTaskManager.getInstance().removeRunningTaskFuture(this);
       timeCost = System.currentTimeMillis() - startTime;
       finished = true;
     }
-
-    return null;
+    return new CompactionTaskSummary(isSuccess);
   }
 
   public String getFullStorageGroupName() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
new file mode 100644
index 0000000000..a7380969ff
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.task;
+
+/** The summary of one {@link AbstractCompactionTask} execution */
+public class CompactionTaskSummary {
+  private final boolean success;
+
+  public CompactionTaskSummary(boolean success) {
+    this.success = success;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 977ebd1a43..923e9e3b77 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -210,7 +210,7 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             new ReadChunkCompactionPerformer(seqResources),
             new AtomicInteger(0));
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
-    manager.addTaskToWaitingQueue(task1);
+    Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
     manager.submitTaskFromTaskQueue();
     while (manager.getTotalTaskCount() > 0) {
       Thread.sleep(10);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
new file mode 100644
index 0000000000..5b1306ff6a
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.inner;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import 
org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InnerCompactionEmptyTsFileTest extends InnerCompactionTest {
+
+  File tempSGDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 
0, 0));
+    if (!tempSGDir.exists()) {
+      Assert.assertTrue(tempSGDir.mkdirs());
+    }
+    seqFileNum = 0;
+    unseqFileNum = 4;
+    super.setUp();
+    tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+    tempSGDir.mkdirs();
+    tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", 
tempSGDir.getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
+  }
+
+  @Override
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, 
long valueOffset)
+      throws IOException, WriteProcessException {
+    // create some empty tsfiles
+    super.prepareFile(tsFileResource, 0, 0, 0);
+  }
+
+  @Test
+  public void testCompactWithPartialEmptyUnseqFiles() throws Exception {
+    tsFileManager.addAll(seqResources, true);
+    tsFileManager.addAll(unseqResources, false);
+
+    // Here we compact file 0-2
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            unseqResources.subList(0, 3),
+            false,
+            new ReadPointCompactionPerformer(),
+            new AtomicInteger(0));
+    Future<CompactionTaskSummary> future = 
CompactionTaskManager.getInstance().submitTask(task);
+    Assert.assertTrue(future.get().isSuccess());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
index b74a9a8dc6..11a1b13d7f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
@@ -74,10 +74,8 @@ public abstract class InnerCompactionTest {
   protected List<TsFileResource> seqResources = new ArrayList<>();
   protected List<TsFileResource> unseqResources = new ArrayList<>();
 
-  private int prevMergeChunkThreshold;
-
   @Before
-  public void setUp() throws IOException, WriteProcessException, 
MetadataException, Exception {
+  public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
     IoTDB.configManager.init();
     prepareSeries();
@@ -213,63 +211,4 @@ public abstract class InnerCompactionTest {
     }
     fileWriter.close();
   }
-
-  List<TsFileResource> prepareTsFileResources() throws IOException, 
WriteProcessException {
-    List<TsFileResource> ret = new ArrayList<>();
-    // prepare file 1
-    File file1 =
-        new File(
-            TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)
-                .concat(
-                    System.nanoTime()
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 0
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 0
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 0
-                        + ".tsfile"));
-    TsFileResource tsFileResource1 = new TsFileResource(file1);
-    tsFileResource1.setStatus(TsFileResourceStatus.CLOSED);
-    tsFileResource1.updatePlanIndexes((long) 0);
-    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
-    fileWriter1.registerTimeseries(new Path(deviceIds[0]), 
measurementSchemas[0]);
-    TSRecord record1 = new TSRecord(0, deviceIds[0]);
-    record1.addTuple(
-        DataPoint.getDataPoint(
-            measurementSchemas[0].getType(),
-            measurementSchemas[0].getMeasurementId(),
-            String.valueOf(0)));
-    fileWriter1.write(record1);
-    fileWriter1.close();
-    // prepare file 2
-    File file2 =
-        new File(
-            TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)
-                .concat(
-                    System.nanoTime()
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 1
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 0
-                        + IoTDBConstant.FILE_NAME_SEPARATOR
-                        + 0
-                        + ".tsfile"));
-    TsFileResource tsFileResource2 = new TsFileResource(file2);
-    tsFileResource2.setStatus(TsFileResourceStatus.CLOSED);
-    tsFileResource2.updatePlanIndexes((long) 1);
-    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
-    fileWriter2.registerTimeseries(new Path(deviceIds[0]), 
measurementSchemas[1]);
-    TSRecord record2 = new TSRecord(0, deviceIds[0]);
-    record2.addTuple(
-        DataPoint.getDataPoint(
-            measurementSchemas[1].getType(),
-            measurementSchemas[1].getMeasurementId(),
-            String.valueOf(0)));
-    fileWriter2.write(record2);
-    fileWriter2.close();
-    ret.add(tsFileResource1);
-    ret.add(tsFileResource2);
-    return ret;
-  }
 }

Reply via email to