This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7a8c70a2e35bda25e66776c34fe8930744cf7bb9
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Oct 17 17:23:34 2023 +0800

    complete draft version for inner compaction refactor
---
 .../exception/CompactionRecoverException.java      |  30 ++
 .../execute/recover/CompactionRecoverManager.java  |  20 -
 .../execute/recover/CompactionRecoverTask.java     | 238 +---------
 .../execute/task/AbstractCompactionTask.java       |  16 +
 .../execute/task/CrossSpaceCompactionTask.java     |   7 -
 .../execute/task/InnerSpaceCompactionTask.java     | 519 ++++++++++++++-------
 .../compaction/execute/utils/CompactionUtils.java  |  12 +
 .../execute/utils/log/CompactionLogAnalyzer.java   |  95 ----
 .../execute/utils/log/CompactionLogger.java        |  16 +-
 .../execute/utils/log/ICompactionLogger.java       |  26 ++
 .../execute/utils/log/InnerCompactionLogger.java   |  67 +++
 .../execute/utils/log/TsFileIdentifier.java        |  30 --
 12 files changed, 503 insertions(+), 573 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
new file mode 100644
index 00000000000..3c63e3d602d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+public class CompactionRecoverException extends RuntimeException {
+  public CompactionRecoverException(String msg) {
+    super(msg);
+  }
+
+  public CompactionRecoverException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
index 93576c2fecc..23e078a1f02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,13 +60,11 @@ public class CompactionRecoverManager {
 
   public void recoverInnerSpaceCompaction(boolean isSequence) {
     logger.info("recovering inner compaction");
-    recoverCompactionBefore013(true);
     recoverCompaction(true, isSequence);
   }
 
   public void recoverCrossSpaceCompaction() {
     logger.info("recovering cross compaction");
-    recoverCompactionBefore013(false);
     recoverCompaction(false, true);
   }
 
@@ -138,21 +135,4 @@ public class CompactionRecoverManager {
           .doCompaction();
     }
   }
-
-  /** Check whether there is old compaction log from previous version (<0.13) 
and recover it. */
-  private void recoverCompactionBefore013(boolean isInnerSpace) {
-    String oldLogName =
-        isInnerSpace
-            ? logicalStorageGroupName + 
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD
-            : CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD;
-    File logFileFromOld =
-        
FSFactoryProducer.getFSFactory().getFile(tsFileManager.getStorageGroupDir(), 
oldLogName);
-
-    if (logFileFromOld.exists()) {
-      logger.info("Calling compaction task to recover from previous version.");
-      new CompactionRecoverTask(
-              logicalStorageGroupName, dataRegionId, tsFileManager, 
logFileFromOld, isInnerSpace)
-          .doCompaction();
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
index ac53ad571d3..e04fd69b3c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
@@ -25,19 +25,12 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
-import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.TsFileUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -82,16 +75,7 @@ public class CompactionRecoverTask {
             fullStorageGroupName,
             compactionLogFile);
         CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(compactionLogFile);
-        CompactionRecoverFromOld compactionRecoverFromOld = new 
CompactionRecoverFromOld();
-        if (isInnerSpace && 
compactionRecoverFromOld.isInnerCompactionLogBefore013()) {
-          // inner compaction log from previous version (<0.13)
-          logAnalyzer.analyzeOldInnerCompactionLog();
-        } else if (!isInnerSpace && 
compactionRecoverFromOld.isCrossCompactionLogBefore013()) {
-          // cross compaction log from previous version (<0.13)
-          logAnalyzer.analyzeOldCrossCompactionLog();
-        } else {
-          logAnalyzer.analyze();
-        }
+        logAnalyzer.analyze();
         List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
         List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
         List<TsFileIdentifier> deletedTargetFileIdentifiers =
@@ -115,24 +99,13 @@ public class CompactionRecoverTask {
         }
 
         if (isAllSourcesFileExisted) {
-          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
-            recoverSuccess =
-                
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistBefore013(
-                    targetFileIdentifiers);
-          } else {
-            recoverSuccess =
-                handleWithAllSourceFilesExist(targetFileIdentifiers, 
sourceFileIdentifiers);
-          }
+          recoverSuccess =
+              handleWithAllSourceFilesExist(targetFileIdentifiers, 
sourceFileIdentifiers);
+
         } else {
-          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
-            recoverSuccess =
-                
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostBefore013(
-                    targetFileIdentifiers, sourceFileIdentifiers);
-          } else {
-            recoverSuccess =
-                handleWithSomeSourceFilesLost(
-                    targetFileIdentifiers, deletedTargetFileIdentifiers, 
sourceFileIdentifiers);
-          }
+          recoverSuccess =
+              handleWithSomeSourceFilesLost(
+                  targetFileIdentifiers, deletedTargetFileIdentifiers, 
sourceFileIdentifiers);
         }
       }
     } catch (IOException e) {
@@ -358,201 +331,4 @@ public class CompactionRecoverTask {
     }
     return true;
   }
-
-  /**
-   * Used to check whether it is recoverd from last version (<0.13) and 
perform corresponding
-   * process.
-   */
-  private class CompactionRecoverFromOld {
-
-    /** Return whether cross compaction log file is from previous version 
(<0.13). */
-    private boolean isCrossCompactionLogBefore013() {
-      return compactionLogFile
-          .getName()
-          .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    }
-
-    /** Return whether inner compaction log file is from previous version 
(<0.13). */
-    private boolean isInnerCompactionLogBefore013() {
-      return 
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
-    }
-
-    /** Delete tmp target file and compaction mods file. */
-    private boolean handleCrossCompactionWithAllSourceFilesExistBefore013(
-        List<TsFileIdentifier> targetFileIdentifiers) {
-      // delete tmp target file
-      for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
-        // xxx.tsfile.merge
-        File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
-        if (tmpTargetFile != null) {
-          try {
-            Files.delete(tmpTargetFile.toPath());
-          } catch (IOException e) {
-            logger.error(
-                "{} [Compaction][Recover] failed to remove file {}, exception: 
{}",
-                fullStorageGroupName,
-                tmpTargetFile,
-                e);
-          }
-          File chunkMetadataTempFile =
-              new File(
-                  tmpTargetFile.getAbsolutePath() + 
TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX);
-          if (chunkMetadataTempFile.exists()) {
-            try {
-              Files.delete(chunkMetadataTempFile.toPath());
-            } catch (IOException e) {
-              logger.error(
-                  "{} [Compaction][Recover] failed to remove file {}, 
exception: {}",
-                  fullStorageGroupName,
-                  chunkMetadataTempFile,
-                  e);
-            }
-          }
-        }
-      }
-
-      // delete compaction mods file
-      File compactionModsFileFromOld =
-          new File(
-              tsFileManager.getStorageGroupDir()
-                  + File.separator
-                  + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
-      return checkAndDeleteFile(compactionModsFileFromOld);
-    }
-
-    /**
-     * 1. If target file does not exist, then move .merge file to target file 
<br>
-     * 2. If target resource file does not exist, then serialize it. <br>
-     * 3. Append merging modification to target mods file and delete merging 
mods file. <br>
-     * 4. Delete source files and .merge file. <br>
-     */
-    @SuppressWarnings("squid:S3776")
-    private boolean handleCrossCompactionWithSomeSourceFilesLostBefore013(
-        List<TsFileIdentifier> targetFileIdentifiers,
-        List<TsFileIdentifier> sourceFileIdentifiers) {
-      try {
-        File compactionModsFileFromOld =
-            new File(
-                tsFileManager.getStorageGroupDir()
-                    + File.separator
-                    + 
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
-        List<TsFileResource> targetFileResources = new ArrayList<>();
-        for (int i = 0; i < sourceFileIdentifiers.size(); i++) {
-          TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i);
-          if (sourceFileIdentifier.isSequence()) {
-            File tmpTargetFile = 
targetFileIdentifiers.get(i).getFileFromDataDirs();
-            File targetFile = null;
-
-            // move tmp target file to target file if not exist
-            if (tmpTargetFile != null) {
-              // move tmp target file to target file
-              String sourceFilePath =
-                  tmpTargetFile
-                      .getPath()
-                      .replace(
-                          TsFileConstant.TSFILE_SUFFIX
-                              + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
-                          TsFileConstant.TSFILE_SUFFIX);
-              targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new 
File(sourceFilePath));
-              FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, 
targetFile);
-            } else {
-              // target file must exist
-              File file =
-                  TsFileNameGenerator.increaseCrossCompactionCnt(
-                      new File(
-                          targetFileIdentifiers
-                              .get(i)
-                              .getFilePath()
-                              .replace(
-                                  TsFileConstant.TSFILE_SUFFIX
-                                      + 
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
-                                  TsFileConstant.TSFILE_SUFFIX)));
-
-              targetFile = getFileFromDataDirs(file.getPath());
-            }
-            if (targetFile == null) {
-              logger.error(
-                  "{} [Compaction][Recover] target file of source seq file {} "
-                      + "does not exist (<0.13).",
-                  fullStorageGroupName,
-                  sourceFileIdentifier.getFilePath());
-              return false;
-            }
-
-            // serialize target resource file if not exist
-            TsFileResource targetResource = new TsFileResource(targetFile);
-            if (!targetResource.resourceFileExists()) {
-              try (TsFileSequenceReader reader =
-                  new TsFileSequenceReader(targetFile.getAbsolutePath())) {
-                FileLoaderUtils.updateTsFileResource(reader, targetResource);
-              }
-              targetResource.serialize();
-            }
-
-            targetFileResources.add(targetResource);
-
-            // append compaction modifications to target mods file and delete 
compaction mods file
-            if (compactionModsFileFromOld.exists()) {
-              ModificationFile compactionModsFile =
-                  new ModificationFile(compactionModsFileFromOld.getPath());
-              appendCompactionModificationsBefore013(targetResource, 
compactionModsFile);
-            }
-
-            // delete tmp target file
-            if (!checkAndDeleteFile(tmpTargetFile)) {
-              return false;
-            }
-          }
-
-          // delete source tsfile
-          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
-          if (!checkAndDeleteFile(sourceFile)) {
-            return false;
-          }
-
-          // delete source resource file
-          sourceFile =
-              getFileFromDataDirs(
-                  sourceFileIdentifier.getFilePath() + 
TsFileResource.RESOURCE_SUFFIX);
-          if (!checkAndDeleteFile(sourceFile)) {
-            return false;
-          }
-
-          // delete source mods file
-          sourceFile =
-              getFileFromDataDirs(
-                  sourceFileIdentifier.getFilePath() + 
ModificationFile.FILE_SUFFIX);
-          if (!checkAndDeleteFile(sourceFile)) {
-            return false;
-          }
-        }
-
-        // delete compaction mods file
-        if (!checkAndDeleteFile(compactionModsFileFromOld)) {
-          return false;
-        }
-      } catch (IOException e) {
-        logger.error(
-            "{} [Compaction][Recover] fail to handle with some source files 
lost from old version.",
-            fullStorageGroupName,
-            e);
-        return false;
-      }
-
-      return true;
-    }
-
-    private void appendCompactionModificationsBefore013(
-        TsFileResource resource, ModificationFile compactionModsFile) throws 
IOException {
-      if (compactionModsFile != null) {
-        for (Modification modification : 
compactionModsFile.getModifications()) {
-          // we have to set modification offset to MAX_VALUE, as the offset of 
source chunk may
-          // change after compaction
-          modification.setFileOffset(Long.MAX_VALUE);
-          resource.getModFile().write(modification);
-        }
-        resource.getModFile().close();
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 02412723c48..7d9512780cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -28,6 +28,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 
+import org.slf4j.Logger;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -108,6 +110,20 @@ public abstract class AbstractCompactionTask {
 
   protected abstract boolean doCompaction();
 
+  protected void printLogWhenException(Logger logger, Exception e) {
+    if (e instanceof InterruptedException) {
+      logger.warn("{}-{} [Compaction] Compaction interrupted", 
storageGroupName, dataRegionId);
+      Thread.currentThread().interrupt();
+    } else {
+      logger.error(
+          "{}-{} [Compaction] Meet errors {}.",
+          compactionTaskType,
+          storageGroupName,
+          dataRegionId,
+          e);
+    }
+  }
+
   public boolean start() {
     boolean isSuccess = false;
     summary.start();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 4f9e23614ce..021e61389db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 
@@ -49,8 +48,6 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   protected List<TsFileResource> selectedSequenceFiles;
   protected List<TsFileResource> selectedUnsequenceFiles;
-  protected TsFileResourceList seqTsFileResourceList;
-  protected TsFileResourceList unseqTsFileResourceList;
   private File logFile;
   protected List<TsFileResource> targetTsfileResourceList;
   protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
@@ -74,10 +71,6 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         serialId);
     this.selectedSequenceFiles = selectedSequenceFiles;
     this.selectedUnsequenceFiles = selectedUnsequenceFiles;
-    this.seqTsFileResourceList =
-        tsFileManager.getOrCreateSequenceListByTimePartition(timePartition);
-    this.unseqTsFileResourceList =
-        tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
     this.performer = performer;
     this.hashCode = this.toString().hashCode();
     this.memoryCost = memoryCost;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index ccde5bf6623..2dc544b4b57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -19,29 +19,36 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.InnerCompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -60,21 +67,23 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
 
   protected List<TsFileResource> selectedTsFileResourceList;
   protected TsFileResource targetTsFileResource;
+  protected boolean isTargetTsFileEmpty;
   protected boolean sequence;
   protected long selectedFileSize;
   protected int sumOfCompactionCount;
   protected long maxFileVersion;
   protected int maxCompactionCount;
-  private File logFile;
 
-  protected TsFileResourceList tsFileResourceList;
+  private File logFile;
+  private InnerCompactionLogger compactionLogger;
   protected List<TsFileResource> targetTsFileList;
   protected boolean[] isHoldingWriteLock;
-
   protected long maxModsFileSize;
-
   protected AbstractInnerSpaceEstimator innerSpaceEstimator;
 
+  protected boolean recoverMemoryStatus;
+  protected boolean needRecoverTaskInfoFromLogFile;
+
   public InnerSpaceCompactionTask(
       long timePartition,
       TsFileManager tsFileManager,
@@ -92,6 +101,30 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         CompactionTaskType.NORMAL);
   }
 
+  public InnerSpaceCompactionTask(
+      String databaseName, String dataRegionId, TsFileManager tsFileManager, 
File logFile) {
+    super(databaseName, dataRegionId, 0L, tsFileManager, 0L, 
CompactionTaskType.NORMAL);
+    this.logFile = logFile;
+    this.needRecoverTaskInfoFromLogFile = true;
+  }
+
+  private void recoverTaskInfoFromLogFile() throws IOException {
+    CompactionLogAnalyzer logAnalyzer = new 
CompactionLogAnalyzer(this.logFile);
+    logAnalyzer.analyze();
+    List<TsFileIdentifier> sourceFileIdentifiers = 
logAnalyzer.getSourceFileInfos();
+    List<TsFileIdentifier> targetFileIdentifiers = 
logAnalyzer.getTargetFileInfos();
+    List<TsFileIdentifier> deletedTargetFileIdentifiers = 
logAnalyzer.getDeletedTargetFileInfos();
+    this.selectedTsFileResourceList = new ArrayList<>();
+    sourceFileIdentifiers.forEach(
+        f -> this.selectedTsFileResourceList.add(new 
TsFileResource(f.getFileFromDataDirs())));
+    if (targetFileIdentifiers.size() > 0) {
+      File targetFileOnDisk = getRealTargetFile(targetFileIdentifiers.get(0));
+      // The targetFileOnDisk may be null, but it won't impact the task 
recover stage
+      this.targetTsFileResource = new TsFileResource(targetFileOnDisk);
+    }
+    this.isTargetTsFileEmpty = deletedTargetFileIdentifiers.size() > 0;
+  }
+
   public InnerSpaceCompactionTask(
       long timePartition,
       TsFileManager tsFileManager,
@@ -121,16 +154,22 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
       isHoldingWriteLock[i] = false;
     }
-    if (sequence) {
-      tsFileResourceList = 
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition);
-    } else {
-      tsFileResourceList = 
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
-    }
     this.hashCode = this.toString().hashCode();
     this.innerSeqTask = sequence;
     this.crossTask = false;
     collectSelectedFilesInfo();
     createSummary();
+    prepareLogFile();
+  }
+
+  private void prepareLogFile() {
+    String dataDirectory = 
selectedTsFileResourceList.get(0).getTsFile().getParent();
+    logFile =
+        new File(
+            dataDirectory
+                + File.separator
+                + targetTsFileResource.getTsFile().getName()
+                + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
   }
 
   @Override
@@ -138,7 +177,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
   protected boolean doCompaction() {
     long startTime = System.currentTimeMillis();
     // get resource of target file
-    String dataDirectory = 
selectedTsFileResourceList.get(0).getTsFile().getParent();
+    recoverMemoryStatus = true;
     LOGGER.info(
         "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, 
"
             + "total file size is {} MB.",
@@ -153,190 +192,310 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       targetTsFileResource =
           TsFileNameGenerator.getInnerCompactionTargetFileResource(
               selectedTsFileResourceList, sequence);
-      logFile =
-          new File(
-              dataDirectory
-                  + File.separator
-                  + targetTsFileResource.getTsFile().getName()
-                  + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
-      try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) {
-        // Here is tmpTargetFile, which is xxx.target
-        targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
-        compactionLogger.logFiles(selectedTsFileResourceList, 
CompactionLogger.STR_SOURCE_FILES);
-        compactionLogger.logFiles(targetTsFileList, 
CompactionLogger.STR_TARGET_FILES);
-        compactionLogger.force();
-        LOGGER.info(
-            "{}-{} [Compaction] compaction with {}",
-            storageGroupName,
-            dataRegionId,
-            selectedTsFileResourceList);
-
-        // carry out the compaction
-        performer.setSourceFiles(selectedTsFileResourceList);
-        // As elements in targetFiles may be removed in 
ReadPointCompactionPerformer, we should use
-        // a
-        // mutable list instead of Collections.singletonList()
-        performer.setTargetFiles(targetTsFileList);
-        performer.setSummary(summary);
-        performer.perform();
-
-        CompactionUtils.updateProgressIndex(
-            targetTsFileList, selectedTsFileResourceList, 
Collections.emptyList());
-        CompactionUtils.moveTargetFile(
-            targetTsFileList, true, storageGroupName + "-" + dataRegionId);
-
-        LOGGER.info(
-            "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
-            storageGroupName,
-            dataRegionId);
-        CompactionUtils.combineModsInInnerCompaction(
-            selectedTsFileResourceList, targetTsFileResource);
-
-        if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
-          throw new InterruptedException(
-              String.format("%s-%s [Compaction] abort", storageGroupName, 
dataRegionId));
-        }
-
-        // replace the old files with new file, the new is in same position as 
the old
-        if (sequence) {
-          tsFileManager.replace(
-              selectedTsFileResourceList,
-              Collections.emptyList(),
-              targetTsFileList,
-              timePartition,
-              true);
-        } else {
-          tsFileManager.replace(
-              Collections.emptyList(),
-              selectedTsFileResourceList,
-              targetTsFileList,
-              timePartition,
-              false);
-        }
+      compactionLogger = new InnerCompactionLogger(logFile);
+
+      // Here is tmpTargetFile, which is xxx.target
+      targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
+      compactionLogger.logSourceFiles(selectedTsFileResourceList);
+      compactionLogger.logTargetFile(targetTsFileResource);
+      compactionLogger.force();
+      LOGGER.info(
+          "{}-{} [Compaction] compaction with {}",
+          storageGroupName,
+          dataRegionId,
+          selectedTsFileResourceList);
+
+      // carry out the compaction
+      performer.setSourceFiles(selectedTsFileResourceList);
+      // As elements in targetFiles may be removed in 
ReadPointCompactionPerformer, we should use
+      // a
+      // mutable list instead of Collections.singletonList()
+      performer.setTargetFiles(targetTsFileList);
+      performer.setSummary(summary);
+      performer.perform();
+
+      CompactionUtils.updateProgressIndex(
+          targetTsFileList, selectedTsFileResourceList, 
Collections.emptyList());
+      CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName 
+ "-" + dataRegionId);
+
+      LOGGER.info(
+          "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
+          storageGroupName,
+          dataRegionId);
+      CompactionUtils.combineModsInInnerCompaction(
+          selectedTsFileResourceList, targetTsFileResource);
+
+      if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
+        throw new InterruptedException(
+            String.format("%s-%s [Compaction] abort", storageGroupName, 
dataRegionId));
+      }
 
-        if (targetTsFileResource.isDeleted()) {
-          compactionLogger.logFile(targetTsFileResource, 
CompactionLogger.STR_DELETED_TARGET_FILES);
-          compactionLogger.force();
-        }
+      // replace the old files with new file, the new is in same position as 
the old
+      if (sequence) {
+        tsFileManager.replace(
+            selectedTsFileResourceList,
+            Collections.emptyList(),
+            targetTsFileList,
+            timePartition,
+            true);
+      } else {
+        tsFileManager.replace(
+            Collections.emptyList(),
+            selectedTsFileResourceList,
+            targetTsFileList,
+            timePartition,
+            false);
+      }
 
-        CompactionValidator validator = CompactionValidator.getInstance();
-        if (!validator.validateCompaction(
-            tsFileManager, targetTsFileList, storageGroupName, timePartition, 
!sequence)) {
-          LOGGER.error(
-              "Failed to pass compaction validation, source files is: {}, 
target files is {}",
-              selectedTsFileResourceList,
-              targetTsFileList);
-          throw new CompactionValidationFailedException("Failed to pass 
compaction validation");
-        }
+      if (targetTsFileResource.isDeleted()) {
+        compactionLogger.logEmptyTargetFile(targetTsFileResource);
+        isTargetTsFileEmpty = true;
+        compactionLogger.force();
+      }
 
-        LOGGER.info(
-            "{}-{} [Compaction] Compacted target files, try to get the write 
lock of source files",
-            storageGroupName,
-            dataRegionId);
+      CompactionValidator validator = CompactionValidator.getInstance();
+      if (!validator.validateCompaction(
+          tsFileManager, targetTsFileList, storageGroupName, timePartition, 
!sequence)) {
+        LOGGER.error(
+            "Failed to pass compaction validation, source files is: {}, target 
files is {}",
+            selectedTsFileResourceList,
+            targetTsFileList);
+        throw new CompactionValidationFailedException("Failed to pass 
compaction validation");
+      }
 
-        // release the read lock of all source files, and get the write lock 
of them to delete them
-        for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-          selectedTsFileResourceList.get(i).writeLock();
-          isHoldingWriteLock[i] = true;
-        }
+      LOGGER.info(
+          "{}-{} [Compaction] Compacted target files, try to get the write 
lock of source files",
+          storageGroupName,
+          dataRegionId);
 
-        if (targetTsFileResource.getTsFile().exists()
-            && targetTsFileResource.getTsFile().length()
-                < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + 
Byte.BYTES) {
-          // the file size is smaller than magic string and version number
-          throw new TsFileNotCompleteException(
-              String.format(
-                  "target file %s is smaller than magic string and version 
number size",
-                  targetTsFileResource));
-        }
+      // release the read lock of all source files, and get the write lock of 
them to delete them
+      for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+        selectedTsFileResourceList.get(i).writeLock();
+        isHoldingWriteLock[i] = true;
+      }
 
-        LOGGER.info(
-            "{}-{} [Compaction] compaction finish, start to delete old files",
-            storageGroupName,
-            dataRegionId);
-        CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(
-            selectedTsFileResourceList, sequence);
-        CompactionUtils.deleteModificationForSourceFile(
-            selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
-
-        // inner space compaction task has only one target file
-        if (!targetTsFileResource.isDeleted()) {
-          FileMetrics.getInstance()
-              .addTsFile(
-                  targetTsFileResource.getDatabaseName(),
-                  targetTsFileResource.getDataRegionId(),
-                  targetTsFileResource.getTsFile().length(),
-                  sequence,
-                  targetTsFileResource.getTsFile().getName());
-
-          // set target resource to CLOSED, so that it can be selected to 
compact
-          targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-        } else {
-          // target resource is empty after compaction, then delete it
-          targetTsFileResource.remove();
-        }
-        CompactionMetrics.getInstance().recordSummaryInfo(summary);
-
-        double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
-        LOGGER.info(
-            "{}-{} [Compaction] {} InnerSpaceCompaction task finishes 
successfully, "
-                + "target file is {},"
-                + "time cost is {} s, "
-                + "compaction speed is {} MB/s, {}",
-            storageGroupName,
-            dataRegionId,
-            sequence ? "Sequence" : "Unsequence",
-            targetTsFileResource.getTsFile().getName(),
-            String.format("%.2f", costTime),
-            String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / 
costTime),
-            summary);
+      if (targetTsFileResource.getTsFile().exists()
+          && targetTsFileResource.getTsFile().length()
+              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) 
{
+        // the file size is smaller than magic string and version number
+        throw new TsFileNotCompleteException(
+            String.format(
+                "target file %s is smaller than magic string and version 
number size",
+                targetTsFileResource));
       }
-      if (logFile.exists()) {
-        FileUtils.delete(logFile);
+
+      LOGGER.info(
+          "{}-{} [Compaction] compaction finish, start to delete old files",
+          storageGroupName,
+          dataRegionId);
+      
CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList,
 sequence);
+      CompactionUtils.deleteModificationForSourceFile(
+          selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
+
+      // inner space compaction task has only one target file
+      if (!targetTsFileResource.isDeleted()) {
+        FileMetrics.getInstance()
+            .addTsFile(
+                targetTsFileResource.getDatabaseName(),
+                targetTsFileResource.getDataRegionId(),
+                targetTsFileResource.getTsFile().length(),
+                sequence,
+                targetTsFileResource.getTsFile().getName());
+
+        // set target resource to CLOSED, so that it can be selected to compact
+        targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+      } else {
+        // target resource is empty after compaction, then delete it
+        targetTsFileResource.remove();
       }
+      CompactionMetrics.getInstance().recordSummaryInfo(summary);
+
+      double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
+      LOGGER.info(
+          "{}-{} [Compaction] {} InnerSpaceCompaction task finishes 
successfully, "
+              + "target file is {},"
+              + "time cost is {} s, "
+              + "compaction speed is {} MB/s, {}",
+          storageGroupName,
+          dataRegionId,
+          sequence ? "Sequence" : "Unsequence",
+          targetTsFileResource.getTsFile().getName(),
+          String.format("%.2f", costTime),
+          String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / 
costTime),
+          summary);
+
+      // 在何时关闭
+      compactionLogger.close();
+      FileUtils.delete(logFile);
     } catch (Exception e) {
       isSuccess = false;
-      // catch throwable to handle OOM errors
-      if (!(e instanceof InterruptedException)) {
-        LOGGER.error(
-            "{}-{} [Compaction] Meet errors in inner space compaction.",
-            storageGroupName,
-            dataRegionId,
-            e);
+      printLogWhenException(LOGGER, e);
+      recover();
+    } finally {
+      releaseAllLocks();
+    }
+    return isSuccess;
+  }
+
+  public void recover() {
+    try {
+      if (needRecoverTaskInfoFromLogFile) {
+        recoverTaskInfoFromLogFile();
+      }
+      if (shouldRollback()) {
+        rollback();
       } else {
-        // clean the interrupt flag
-        LOGGER.warn("{}-{} [Compaction] Compaction interrupted", 
storageGroupName, dataRegionId);
-        Thread.interrupted();
+        // That finishTask() is revoked means
+        finishTask();
       }
+    } catch (Exception e) {
+      handleRecoverException(e);
+    }
+  }
 
-      // handle exception
-      if (isSequence()) {
-        CompactionExceptionHandler.handleException(
-            storageGroupName + "-" + dataRegionId,
-            logFile,
-            targetTsFileList,
-            selectedTsFileResourceList,
-            Collections.emptyList(),
-            tsFileManager,
-            timePartition,
-            true,
-            isSequence());
-      } else {
-        CompactionExceptionHandler.handleException(
-            storageGroupName + "-" + dataRegionId,
-            logFile,
-            targetTsFileList,
-            Collections.emptyList(),
-            selectedTsFileResourceList,
-            tsFileManager,
-            timePartition,
-            true,
-            isSequence());
+  protected void handleRecoverException(Exception e) {
+    LOGGER.error(
+        "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {}, 
Exception: {}",
+        dataRegionId,
+        this,
+        e);
+    tsFileManager.setAllowCompaction(false);
+    LOGGER.error("stop compaction because of exception during recovering");
+    // 考虑是否需要将系统设置为 READONLY
+    
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
+  }
+
+  private void removeTsFileInMemory(List<TsFileResource> resourceList) {
+    tsFileManager.writeLock("CompactionExceptionHandler");
+    try {
+      for (TsFileResource targetTsFile : resourceList) {
+        if (targetTsFile == null) {
+          // target file has been deleted due to empty after compaction
+          continue;
+        }
+        tsFileManager.remove(targetTsFile, targetTsFile.isSeq());
       }
     } finally {
-      releaseAllLocks();
+      tsFileManager.writeUnlock();
     }
-    return isSuccess;
+  }
+
+  private void insertFilesToTsFileManager(List<TsFileResource> tsFiles) throws 
IOException {
+    for (TsFileResource tsFileResource : tsFiles) {
+      tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq());
+    }
+  }
+
+  private void rollback() throws Exception {
+    // if the task has started,
+    if (recoverMemoryStatus) {
+      removeTsFileInMemory(Collections.singletonList(targetTsFileResource));
+      insertFilesToTsFileManager(selectedTsFileResourceList);
+    }
+    deleteCompactionModsFile(selectedTsFileResourceList);
+    // delete target file
+    if (targetTsFileResource != null) {
+      if (!deleteTsFileOnDisk(targetTsFileResource)) {
+        throw new CompactionRecoverException(
+            String.format("failed to delete target file %s", 
targetTsFileResource));
+      }
+    }
+  }
+
+  private void finishTask() throws IOException {
+    if (targetTsFileResource.isDeleted() || isTargetTsFileEmpty) {
+      // it means the target file is empty after compaction
+      if (targetTsFileResource.remove()) {
+        throw new CompactionRecoverException(
+            String.format("failed to delete empty target file %s", 
targetTsFileResource));
+      }
+    } else {
+      File targetFile = targetTsFileResource.getTsFile();
+      if (targetFile == null || 
!TsFileUtils.isTsFileComplete(targetTsFileResource.getTsFile())) {
+        throw new CompactionRecoverException(
+            String.format("Target file is not completed. %s", targetFile));
+      }
+      if (recoverMemoryStatus) {
+        targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+      }
+    }
+    if (!deleteTsFilesOnDisk(selectedTsFileResourceList)) {
+      throw new CompactionRecoverException("source files cannot be deleted 
successfully");
+    }
+    if (recoverMemoryStatus) {
+      // 这个地方的统计可能是不准确的
+      FileMetrics.getInstance().deleteTsFile(true, selectedTsFileResourceList);
+    }
+    deleteCompactionModsFile(selectedTsFileResourceList);
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in 
every data directory. If
+   * the file is not found, it will return null.
+   */
+  public File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  public File getRealTargetFile(TsFileIdentifier targetFileIdentifier) {
+    File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+    File targetFile =
+        getFileFromDataDirs(
+            targetFileIdentifier
+                .getFilePath()
+                .replace(
+                    IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX, 
TsFileConstant.TSFILE_SUFFIX));
+    return tmpTargetFile != null ? tmpTargetFile : targetFile;
+  }
+
+  public void deleteCompactionModsFile(List<TsFileResource> 
tsFileResourceList) throws IOException {
+    for (TsFileResource seqFile : tsFileResourceList) {
+      ModificationFile modificationFile = seqFile.getCompactionModFile();
+      if (modificationFile.exists()) {
+        modificationFile.remove();
+      }
+    }
+  }
+
+  private boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) {
+    for (TsFileResource resource : tsFiles) {
+      if (!deleteTsFileOnDisk(resource)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean deleteTsFileOnDisk(TsFileResource tsFileResource) {
+    tsFileResource.writeLock();
+    try {
+      if (!tsFileResource.remove()) {
+        return false;
+      }
+    } finally {
+      targetTsFileResource.writeUnlock();
+    }
+    return true;
+  }
+
+  private boolean shouldRollback() {
+    return checkAllSourceFileExists(selectedTsFileResourceList);
+  }
+
+  private boolean checkAllSourceFileExists(List<TsFileResource> 
tsFileResources) {
+    for (TsFileResource tsFileResource : tsFileResources) {
+      if (!tsFileResource.getTsFile().exists() || 
!tsFileResource.resourceFileExists()) {
+        return false;
+      }
+    }
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index a733d80941d..d188905a614 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -212,6 +212,16 @@ public class CompactionUtils {
     }
   }
 
+  public static void deleteCompactionModsFile(List<TsFileResource> 
tsFileResourceList)
+      throws IOException {
+    for (TsFileResource seqFile : tsFileResourceList) {
+      ModificationFile modificationFile = seqFile.getCompactionModFile();
+      if (modificationFile.exists()) {
+        modificationFile.remove();
+      }
+    }
+  }
+
   public static boolean deleteTsFilesInDisk(
       Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
     logger.info("{} [Compaction] Compaction starts to delete real file ", 
storageGroupName);
@@ -449,6 +459,8 @@ public class CompactionUtils {
     deleteSourceTsFileAndUpdateFileMetrics(sourceUnseqResourceList, false);
   }
 
+  public static void deleteTsFiles(List<TsFileResource> tsfiles) {}
+
   public static void deleteSourceTsFileAndUpdateFileMetrics(
       List<TsFileResource> resources, boolean seq) {
     List<TsFileResource> removeResources = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
index 61edea2d225..031a7f95a7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
@@ -19,9 +19,6 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -29,16 +26,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.SEQUENCE_NAME_FROM_OLD;
 import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_MERGE_START_FROM_OLD;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SEQ_FILES_FROM_OLD;
 import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES_FROM_OLD;
 import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES_FROM_OLD;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_UNSEQ_FILES_FROM_OLD;
-import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.UNSEQUENCE_NAME_FROM_OLD;
 
 public class CompactionLogAnalyzer {
 
@@ -77,91 +67,6 @@ public class CompactionLogAnalyzer {
     }
   }
 
-  /**
-   * Analyze inner space compaction log of previous version (<0.13).
-   *
-   * @throws IOException if io errors occurred
-   */
-  public void analyzeOldInnerCompactionLog() throws IOException {
-    isLogFromOld = true;
-    String currLine;
-    try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(logFile))) {
-      while ((currLine = bufferedReader.readLine()) != null) {
-        switch (currLine) {
-          case STR_SOURCE_FILES_FROM_OLD:
-            currLine = bufferedReader.readLine();
-            
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine));
-            break;
-          case STR_TARGET_FILES_FROM_OLD:
-            currLine = bufferedReader.readLine();
-            
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine));
-            break;
-          case STR_SOURCE_FILES:
-            currLine = bufferedReader.readLine();
-            
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine));
-            break;
-          case STR_TARGET_FILES:
-            currLine = bufferedReader.readLine();
-            
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine));
-            break;
-          case SEQUENCE_NAME_FROM_OLD:
-          case UNSEQUENCE_NAME_FROM_OLD:
-            break;
-          default:
-            break;
-        }
-      }
-    }
-  }
-
-  /**
-   * Analyze cross space compaction log of previous version (<0.13).
-   *
-   * @throws IOException if io errors occurred
-   */
-  @SuppressWarnings("squid:S135")
-  public void analyzeOldCrossCompactionLog() throws IOException {
-    isLogFromOld = true;
-    String currLine;
-    boolean isSeqSource = true;
-    try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(logFile))) {
-      while ((currLine = bufferedReader.readLine()) != null) {
-        if (currLine.equals(STR_UNSEQ_FILES_FROM_OLD)) {
-          isSeqSource = false;
-          continue;
-        } else if (currLine.equals(STR_SEQ_FILES_FROM_OLD)) {
-          isSeqSource = true;
-          continue;
-        } else if (currLine.equals(STR_MERGE_START_FROM_OLD)) {
-          break;
-        }
-        analyzeOldFilePath(isSeqSource, currLine);
-      }
-    }
-  }
-
-  private void analyzeOldFilePath(boolean isSeqSource, String oldFilePath) {
-    if (oldFilePath.startsWith("root")) {
-      
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(oldFilePath));
-    } else {
-      
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(oldFilePath));
-    }
-
-    int pos = oldFilePath.lastIndexOf(TsFileConstant.TSFILE_SUFFIX);
-    if (isSeqSource) {
-      String targetFilePath =
-          oldFilePath.substring(0, pos)
-              + TsFileConstant.TSFILE_SUFFIX
-              + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD
-              + oldFilePath.substring(pos + 
TsFileConstant.TSFILE_SUFFIX.length());
-      if (oldFilePath.startsWith("root")) {
-        
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(targetFilePath));
-      } else {
-        
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(targetFilePath));
-      }
-    }
-  }
-
   public List<TsFileIdentifier> getSourceFileInfos() {
     return sourceFileInfos;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index 9d02418b8ea..248726e34e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -30,26 +30,22 @@ import java.util.List;
 public class CompactionLogger implements AutoCloseable {
 
   public static final String CROSS_COMPACTION_LOG_NAME_SUFFIX = 
".cross-compaction.log";
-  public static final String CROSS_COMPACTION_LOG_NAME_FROM_OLD = "merge.log";
   public static final String INNER_COMPACTION_LOG_NAME_SUFFIX = 
".inner-compaction.log";
-  public static final String INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD = 
".compaction.log";
 
   public static final String STR_SOURCE_FILES = "source";
   public static final String STR_TARGET_FILES = "target";
-
   public static final String STR_DELETED_TARGET_FILES = "empty";
 
-  public static final String STR_SOURCE_FILES_FROM_OLD = "info-source";
-  public static final String STR_TARGET_FILES_FROM_OLD = "info-target";
-  public static final String STR_SEQ_FILES_FROM_OLD = "seqFiles";
-  public static final String STR_UNSEQ_FILES_FROM_OLD = "unseqFiles";
-  public static final String SEQUENCE_NAME_FROM_OLD = "sequence";
-  public static final String UNSEQUENCE_NAME_FROM_OLD = "unsequence";
-  public static final String STR_MERGE_START_FROM_OLD = "merge start";
+  protected List<TsFileIdentifier> sourceFileIdentifiers;
+  protected List<TsFileIdentifier> targetFileIdentifiers;
+  protected List<TsFileIdentifier> deletedTargetFileIdentifiers;
 
+  protected boolean needRecoverFromLogFile;
+  private File logFile;
   private FileOutputStream logStream;
 
   public CompactionLogger(File logFile) throws IOException {
+    this.logFile = logFile;
     logStream = new FileOutputStream(logFile, true);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
new file mode 100644
index 00000000000..15473f68e41
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
+
+import java.util.List;
+
+public interface ICompactionLogger {
+  List<String> getSourceFileList();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
new file mode 100644
index 00000000000..e96b5104a10
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** MergeLogger records the progress of a merge in file "merge.log" as text 
lines. */
+public class InnerCompactionLogger extends CompactionLogger {
+
+  private List<TsFileResource> sourceFiles;
+  private TsFileResource targetFile;
+  private TsFileResource emptyTargetFile;
+
+  public InnerCompactionLogger(File logFile) throws IOException {
+    super(logFile);
+    this.sourceFiles = new ArrayList<>();
+  }
+
+  public List<TsFileResource> getSourceFiles() {
+    return sourceFiles;
+  }
+
+  public TsFileResource getTargetFile() {
+    return targetFile;
+  }
+
+  public TsFileResource getEmptyTargetFile() {
+    return emptyTargetFile;
+  }
+
+  public void logSourceFiles(List<TsFileResource> sourceFiles) throws 
IOException {
+    this.sourceFiles.addAll(sourceFiles);
+    logFiles(sourceFiles, STR_SOURCE_FILES);
+  }
+
+  public void logTargetFile(TsFileResource targetFile) throws IOException {
+    this.targetFile = targetFile;
+    logFile(targetFile, STR_TARGET_FILES);
+  }
+
+  public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws 
IOException {
+    this.emptyTargetFile = emptyTargetFile;
+    logFile(emptyTargetFile, STR_DELETED_TARGET_FILES);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
index e1ac67bd8ee..6b3b5fdddd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
@@ -51,12 +51,6 @@ public class TsFileIdentifier {
   public static final int TIME_PARTITION_OFFSET_IN_LOG = 3;
   public static final int FILE_NAME_OFFSET_IN_LOG = 4;
 
-  private static final int LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD = 0;
-  private static final int DATA_REGION_OFFSET_IN_LOG_FROM_OLD = 1;
-  private static final int TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD = 2;
-  private static final int FILE_NAME_OFFSET_IN_LOG_FROM_OLD = 3;
-  private static final int SEQUENCE_OFFSET_IN_LOG_FROM_OLD = 4;
-
   private static final String SEQUENCE_STR = "sequence";
   private static final String UNSEQUENCE_STR = "unsequence";
 
@@ -123,30 +117,6 @@ public class TsFileIdentifier {
         splittedFileInfo[FILE_NAME_OFFSET_IN_LOG]);
   }
 
-  /**
-   * This function generates an instance of CompactionFileIdentifier by 
parsing the old info string
-   * from previous version (<0.13) of a tsfile(usually recorded in a 
compaction.log). Such as
-   * “root.test.sg 0 0 1-1-0-0.tsfile true" from old cross space compaction 
log and "root.test.sg 0
-   * 0 1-1-0-0.tsfile sequence" from old inner space compaction log.
-   */
-  public static TsFileIdentifier getFileIdentifierFromOldInfoString(String 
oldInfoString) {
-    String[] splittedFileInfo = oldInfoString.split(INFO_SEPARATOR);
-    int length = splittedFileInfo.length;
-    if (length != 5) {
-      throw new RuntimeException(
-          String.format(
-              "String %s is not a legal file info string from previous version 
(<0.13)",
-              oldInfoString));
-    }
-    return new TsFileIdentifier(
-        splittedFileInfo[LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD],
-        splittedFileInfo[DATA_REGION_OFFSET_IN_LOG_FROM_OLD],
-        splittedFileInfo[TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD],
-        splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals("true")
-            || 
splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals(SEQUENCE_STR),
-        splittedFileInfo[FILE_NAME_OFFSET_IN_LOG_FROM_OLD]);
-  }
-
   @Override
   public String toString() {
     return String.format(

Reply via email to