This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/compaction_recover_logger_1017 
by this push:
     new a0609d3cc2e add compaction task stage
a0609d3cc2e is described below

commit a0609d3cc2e185ff979e9d8292610b0eb6dec951
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Oct 17 23:50:25 2023 +0800

    add compaction task stage
---
 .../execute/task/AbstractCompactionTask.java       |   7 +-
 .../execute/task/CrossSpaceCompactionTask.java     |  45 +---
 .../execute/task/InnerSpaceCompactionTask.java     | 264 ++++++++++-----------
 .../execute/utils/log/CompactionLogAnalyzer.java   |  17 +-
 .../execute/utils/log/CompactionLogger.java        |   6 +
 ...pactionLogger.java => CompactionTaskStage.java} |  19 +-
 ...tionLogger.java => SimpleCompactionLogger.java} |  29 +--
 7 files changed, 189 insertions(+), 198 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index eb17f10f8c0..3be3ee32fd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionTaskStage;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask {
   protected long serialId;
   protected boolean crossTask;
   protected boolean innerSeqTask;
-
+  protected CompactionTaskStage taskStage;
   protected long memoryCost = 0L;
 
   protected boolean recoverMemoryStatus;
@@ -312,6 +313,10 @@ public abstract class AbstractCompactionTask {
     }
   }
 
+  public void setTaskStage(CompactionTaskStage stage) {
+    this.taskStage = stage;
+  }
+
   public boolean isTaskRan() {
     return summary.isRan();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index eaaf78df9c1..f03ab79f12c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
@@ -31,6 +30,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -39,12 +39,12 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -121,6 +121,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         this.emptyTargetTsFileResourceList.add(targetTsFile);
       }
     }
+    this.taskStage = logAnalyzer.getTaskStage();
   }
 
   @Override
@@ -178,11 +179,11 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
                   + targetTsfileResourceList.get(0).getTsFile().getName()
                   + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
 
-      try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) {
+      try (SimpleCompactionLogger compactionLogger = new 
SimpleCompactionLogger(logFile)) {
         // print the path of the temporary file first for priority check 
during recovery
-        compactionLogger.logFiles(selectedSequenceFiles, 
CompactionLogger.STR_SOURCE_FILES);
-        compactionLogger.logFiles(selectedUnsequenceFiles, 
CompactionLogger.STR_SOURCE_FILES);
-        compactionLogger.logFiles(targetTsfileResourceList, 
CompactionLogger.STR_TARGET_FILES);
+        compactionLogger.logSourceFiles(selectedSequenceFiles);
+        compactionLogger.logSourceFiles(selectedUnsequenceFiles);
+        compactionLogger.logTargetFiles(targetTsfileResourceList);
         compactionLogger.force();
 
         performer.setSourceFiles(selectedSequenceFiles, 
selectedUnsequenceFiles);
@@ -209,7 +210,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         for (TsFileResource targetResource : targetTsfileResourceList) {
           if (targetResource.isDeleted()) {
             emptyTargetTsFileResourceList.add(targetResource);
-            compactionLogger.logFile(targetResource, 
CompactionLogger.STR_DELETED_TARGET_FILES);
+            compactionLogger.logEmptyTargetFile(targetResource);
             compactionLogger.force();
           }
         }
@@ -284,35 +285,11 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
                 (selectedSeqFileSize + selectedUnseqFileSize) / 1024.0d / 
1024.0d / costTime),
             summary);
       }
-      if (logFile.exists()) {
-        FileUtils.delete(logFile);
-      }
+      Files.deleteIfExists(logFile.toPath());
     } catch (Exception e) {
       isSuccess = false;
-      // catch throwable to handle OOM errors
-      if (!(e instanceof InterruptedException)) {
-        LOGGER.error(
-            "{}-{} [Compaction] Meet errors in cross space compaction.",
-            storageGroupName,
-            dataRegionId,
-            e);
-      } else {
-        LOGGER.warn("{}-{} [Compaction] Compaction interrupted", 
storageGroupName, dataRegionId);
-        // clean the interrupted flag
-        Thread.interrupted();
-      }
-
-      // handle exception
-      CompactionExceptionHandler.handleException(
-          storageGroupName + "-" + dataRegionId,
-          logFile,
-          targetTsfileResourceList,
-          selectedSequenceFiles,
-          selectedUnsequenceFiles,
-          tsFileManager,
-          timePartition,
-          false,
-          true);
+      printLogWhenException(LOGGER, e);
+      recover();
     } finally {
       releaseAllLocks();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 8644b5a054c..180ba500877 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -32,7 +32,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.InnerCompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
@@ -46,10 +46,9 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
-import org.apache.commons.io.FileUtils;
-
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -65,9 +64,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
   protected int sumOfCompactionCount;
   protected long maxFileVersion;
   protected int maxCompactionCount;
-
   private File logFile;
-  private InnerCompactionLogger compactionLogger;
   protected List<TsFileResource> targetTsFileList;
   protected boolean[] isHoldingWriteLock;
   protected long maxModsFileSize;
@@ -115,6 +112,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       this.targetTsFileResource = new TsFileResource(targetFileOnDisk);
     }
     this.isTargetTsFileEmpty = deletedTargetFileIdentifiers.size() > 0;
+    this.taskStage = logAnalyzer.getTaskStage();
   }
 
   public InnerSpaceCompactionTask(
@@ -184,141 +182,141 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       targetTsFileResource =
           TsFileNameGenerator.getInnerCompactionTargetFileResource(
               selectedTsFileResourceList, sequence);
-      compactionLogger = new InnerCompactionLogger(logFile);
-
-      // Here is tmpTargetFile, which is xxx.target
-      targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
-      compactionLogger.logSourceFiles(selectedTsFileResourceList);
-      compactionLogger.logTargetFile(targetTsFileResource);
-      compactionLogger.force();
-      LOGGER.info(
-          "{}-{} [Compaction] compaction with {}",
-          storageGroupName,
-          dataRegionId,
-          selectedTsFileResourceList);
-
-      // carry out the compaction
-      performer.setSourceFiles(selectedTsFileResourceList);
-      // As elements in targetFiles may be removed in 
ReadPointCompactionPerformer, we should use
-      // a
-      // mutable list instead of Collections.singletonList()
-      performer.setTargetFiles(targetTsFileList);
-      performer.setSummary(summary);
-      performer.perform();
-
-      CompactionUtils.updateProgressIndex(
-          targetTsFileList, selectedTsFileResourceList, 
Collections.emptyList());
-      CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName 
+ "-" + dataRegionId);
-
-      LOGGER.info(
-          "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
-          storageGroupName,
-          dataRegionId);
-      CompactionUtils.combineModsInInnerCompaction(
-          selectedTsFileResourceList, targetTsFileResource);
-
-      if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
-        throw new InterruptedException(
-            String.format("%s-%s [Compaction] abort", storageGroupName, 
dataRegionId));
-      }
-
-      // replace the old files with new file, the new is in same position as 
the old
-      if (sequence) {
-        tsFileManager.replace(
-            selectedTsFileResourceList,
-            Collections.emptyList(),
-            targetTsFileList,
-            timePartition,
-            true);
-      } else {
-        tsFileManager.replace(
-            Collections.emptyList(),
-            selectedTsFileResourceList,
-            targetTsFileList,
-            timePartition,
-            false);
-      }
-
-      if (targetTsFileResource.isDeleted()) {
-        compactionLogger.logEmptyTargetFile(targetTsFileResource);
-        isTargetTsFileEmpty = true;
+      try (SimpleCompactionLogger compactionLogger = new 
SimpleCompactionLogger(logFile)) {
+        // Here is tmpTargetFile, which is xxx.target
+        targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
+        compactionLogger.logSourceFiles(selectedTsFileResourceList);
+        compactionLogger.logTargetFile(targetTsFileResource);
         compactionLogger.force();
-      }
+        LOGGER.info(
+            "{}-{} [Compaction] compaction with {}",
+            storageGroupName,
+            dataRegionId,
+            selectedTsFileResourceList);
+
+        // carry out the compaction
+        performer.setSourceFiles(selectedTsFileResourceList);
+        // As elements in targetFiles may be removed in 
ReadPointCompactionPerformer, we should use
+        // a
+        // mutable list instead of Collections.singletonList()
+        performer.setTargetFiles(targetTsFileList);
+        performer.setSummary(summary);
+        performer.perform();
+
+        CompactionUtils.updateProgressIndex(
+            targetTsFileList, selectedTsFileResourceList, 
Collections.emptyList());
+        CompactionUtils.moveTargetFile(
+            targetTsFileList, true, storageGroupName + "-" + dataRegionId);
+
+        LOGGER.info(
+            "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
+            storageGroupName,
+            dataRegionId);
+        CompactionUtils.combineModsInInnerCompaction(
+            selectedTsFileResourceList, targetTsFileResource);
+
+        if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
+          throw new InterruptedException(
+              String.format("%s-%s [Compaction] abort", storageGroupName, 
dataRegionId));
+        }
 
-      CompactionValidator validator = CompactionValidator.getInstance();
-      if (!validator.validateCompaction(
-          tsFileManager, targetTsFileList, storageGroupName, timePartition, 
!sequence)) {
-        LOGGER.error(
-            "Failed to pass compaction validation, source files is: {}, target 
files is {}",
-            selectedTsFileResourceList,
-            targetTsFileList);
-        throw new CompactionValidationFailedException("Failed to pass 
compaction validation");
-      }
+        // replace the old files with new file, the new is in same position as 
the old
+        if (sequence) {
+          tsFileManager.replace(
+              selectedTsFileResourceList,
+              Collections.emptyList(),
+              targetTsFileList,
+              timePartition,
+              true);
+        } else {
+          tsFileManager.replace(
+              Collections.emptyList(),
+              selectedTsFileResourceList,
+              targetTsFileList,
+              timePartition,
+              false);
+        }
 
-      LOGGER.info(
-          "{}-{} [Compaction] Compacted target files, try to get the write 
lock of source files",
-          storageGroupName,
-          dataRegionId);
+        if (targetTsFileResource.isDeleted()) {
+          compactionLogger.logEmptyTargetFile(targetTsFileResource);
+          isTargetTsFileEmpty = true;
+          compactionLogger.force();
+        }
 
-      // release the read lock of all source files, and get the write lock of 
them to delete them
-      for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-        selectedTsFileResourceList.get(i).writeLock();
-        isHoldingWriteLock[i] = true;
-      }
+        CompactionValidator validator = CompactionValidator.getInstance();
+        if (!validator.validateCompaction(
+            tsFileManager, targetTsFileList, storageGroupName, timePartition, 
!sequence)) {
+          LOGGER.error(
+              "Failed to pass compaction validation, source files is: {}, 
target files is {}",
+              selectedTsFileResourceList,
+              targetTsFileList);
+          throw new CompactionValidationFailedException("Failed to pass 
compaction validation");
+        }
 
-      if (targetTsFileResource.getTsFile().exists()
-          && targetTsFileResource.getTsFile().length()
-              < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) 
{
-        // the file size is smaller than magic string and version number
-        throw new TsFileNotCompleteException(
-            String.format(
-                "target file %s is smaller than magic string and version 
number size",
-                targetTsFileResource));
-      }
+        LOGGER.info(
+            "{}-{} [Compaction] Compacted target files, try to get the write 
lock of source files",
+            storageGroupName,
+            dataRegionId);
+        // release the read lock of all source files, and get the write lock 
of them to delete them
+        for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+          selectedTsFileResourceList.get(i).writeLock();
+          isHoldingWriteLock[i] = true;
+        }
 
-      LOGGER.info(
-          "{}-{} [Compaction] compaction finish, start to delete old files",
-          storageGroupName,
-          dataRegionId);
-      
CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList,
 sequence);
-      CompactionUtils.deleteModificationForSourceFile(
-          selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
-
-      // inner space compaction task has only one target file
-      if (!targetTsFileResource.isDeleted()) {
-        FileMetrics.getInstance()
-            .addTsFile(
-                targetTsFileResource.getDatabaseName(),
-                targetTsFileResource.getDataRegionId(),
-                targetTsFileResource.getTsFile().length(),
-                sequence,
-                targetTsFileResource.getTsFile().getName());
-
-        // set target resource to CLOSED, so that it can be selected to compact
-        targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-      } else {
-        // target resource is empty after compaction, then delete it
-        targetTsFileResource.remove();
+        if (targetTsFileResource.getTsFile().exists()
+            && targetTsFileResource.getTsFile().length()
+                < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + 
Byte.BYTES) {
+          // the file size is smaller than magic string and version number
+          throw new TsFileNotCompleteException(
+              String.format(
+                  "target file %s is smaller than magic string and version 
number size",
+                  targetTsFileResource));
+        }
+
+        LOGGER.info(
+            "{}-{} [Compaction] compaction finish, start to delete old files",
+            storageGroupName,
+            dataRegionId);
+        CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(
+            selectedTsFileResourceList, sequence);
+        CompactionUtils.deleteModificationForSourceFile(
+            selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
+
+        // inner space compaction task has only one target file
+        if (!targetTsFileResource.isDeleted()) {
+          FileMetrics.getInstance()
+              .addTsFile(
+                  targetTsFileResource.getDatabaseName(),
+                  targetTsFileResource.getDataRegionId(),
+                  targetTsFileResource.getTsFile().length(),
+                  sequence,
+                  targetTsFileResource.getTsFile().getName());
+
+          // set target resource to CLOSED, so that it can be selected to 
compact
+          targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+        } else {
+          // target resource is empty after compaction, then delete it
+          targetTsFileResource.remove();
+        }
+        CompactionMetrics.getInstance().recordSummaryInfo(summary);
+
+        double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
+        LOGGER.info(
+            "{}-{} [Compaction] {} InnerSpaceCompaction task finishes 
successfully, "
+                + "target file is {},"
+                + "time cost is {} s, "
+                + "compaction speed is {} MB/s, {}",
+            storageGroupName,
+            dataRegionId,
+            sequence ? "Sequence" : "Unsequence",
+            targetTsFileResource.getTsFile().getName(),
+            String.format("%.2f", costTime),
+            String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / 
costTime),
+            summary);
+
+        // 在何时关闭
       }
-      CompactionMetrics.getInstance().recordSummaryInfo(summary);
-
-      double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
-      LOGGER.info(
-          "{}-{} [Compaction] {} InnerSpaceCompaction task finishes 
successfully, "
-              + "target file is {},"
-              + "time cost is {} s, "
-              + "compaction speed is {} MB/s, {}",
-          storageGroupName,
-          dataRegionId,
-          sequence ? "Sequence" : "Unsequence",
-          targetTsFileResource.getTsFile().getName(),
-          String.format("%.2f", costTime),
-          String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / 
costTime),
-          summary);
-
-      // 在何时关闭
-      compactionLogger.close();
-      FileUtils.delete(logFile);
+      Files.deleteIfExists(logFile.toPath());
     } catch (Exception e) {
       isSuccess = false;
       printLogWhenException(LOGGER, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
index 031a7f95a7a..0baa7117392 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
@@ -25,6 +25,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
 import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
@@ -36,6 +37,7 @@ public class CompactionLogAnalyzer {
   private final List<TsFileIdentifier> sourceFileInfos = new ArrayList<>();
   private final List<TsFileIdentifier> targetFileInfos = new ArrayList<>();
   private final List<TsFileIdentifier> deletedTargetFileInfos = new 
ArrayList<>();
+  private CompactionTaskStage taskStage;
   private boolean isLogFromOld = false;
 
   public CompactionLogAnalyzer(File logFile) {
@@ -48,9 +50,10 @@ public class CompactionLogAnalyzer {
    * @throws IOException if io errors occurred
    */
   public void analyze() throws IOException {
-    String currLine;
     try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(logFile))) {
+      String currLine;
       while ((currLine = bufferedReader.readLine()) != null) {
+        final String lineValue = currLine;
         String fileInfo;
         if (currLine.startsWith(STR_SOURCE_FILES)) {
           fileInfo = currLine.replaceFirst(STR_SOURCE_FILES + 
TsFileIdentifier.INFO_SEPARATOR, "");
@@ -58,10 +61,16 @@ public class CompactionLogAnalyzer {
         } else if (currLine.startsWith(STR_TARGET_FILES)) {
           fileInfo = currLine.replaceFirst(STR_TARGET_FILES + 
TsFileIdentifier.INFO_SEPARATOR, "");
           
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
-        } else {
+        } else if (currLine.startsWith(STR_DELETED_TARGET_FILES)) {
           fileInfo =
               currLine.replaceFirst(STR_DELETED_TARGET_FILES + 
TsFileIdentifier.INFO_SEPARATOR, "");
           
deletedTargetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
+        } else if (Stream.of(CompactionTaskStage.values())
+            .anyMatch(stage -> lineValue.startsWith(stage.name()))) {
+          taskStage = CompactionTaskStage.valueOf(currLine);
+        } else {
+          throw new IllegalArgumentException(
+              String.format("unknown compaction log line: %s", currLine));
         }
       }
     }
@@ -79,7 +88,7 @@ public class CompactionLogAnalyzer {
     return deletedTargetFileInfos;
   }
 
-  public boolean isLogFromOld() {
-    return isLogFromOld;
+  public CompactionTaskStage getTaskStage() {
+    return taskStage;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index 248726e34e1..2ded4a27b73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -60,6 +60,12 @@ public class CompactionLogger implements AutoCloseable {
     }
   }
 
+  public void logTaskStage(CompactionTaskStage stage) throws IOException {
+    logStream.write(stage.name().getBytes());
+    logStream.write(System.lineSeparator().getBytes());
+    logStream.flush();
+  }
+
   public void logFile(TsFileResource tsFile, String flag) throws IOException {
     String log =
         flag
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
similarity index 63%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
index 15473f68e41..9a549a1790b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
@@ -19,8 +19,21 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
 
-import java.util.List;
+public enum CompactionTaskStage {
+  NOT_STARTED("task does not started"),
+  STARTED("task has started"),
+  TARGET_FILE_GENERATED("target file has been generated completely"),
+  TARGET_FILE_REPLACED("target file has been replaced in TsFileManager"),
+  SOURCE_FILE_CLEANED("source files are cleaned up"),
+  FINISHED("task finished");
 
-public interface ICompactionLogger {
-  List<String> getSourceFileList();
+  private final String descirption;
+
+  CompactionTaskStage(String descirption) {
+    this.descirption = descirption;
+  }
+
+  public String getDescription() {
+    return this.descirption;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
similarity index 70%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
index e96b5104a10..d0769f5ba5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
@@ -23,45 +23,28 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 /** MergeLogger records the progress of a merge in file "merge.log" as text 
lines. */
-public class InnerCompactionLogger extends CompactionLogger {
+public class SimpleCompactionLogger extends CompactionLogger {
 
-  private List<TsFileResource> sourceFiles;
-  private TsFileResource targetFile;
-  private TsFileResource emptyTargetFile;
-
-  public InnerCompactionLogger(File logFile) throws IOException {
+  public SimpleCompactionLogger(File logFile) throws IOException {
     super(logFile);
-    this.sourceFiles = new ArrayList<>();
-  }
-
-  public List<TsFileResource> getSourceFiles() {
-    return sourceFiles;
-  }
-
-  public TsFileResource getTargetFile() {
-    return targetFile;
-  }
-
-  public TsFileResource getEmptyTargetFile() {
-    return emptyTargetFile;
   }
 
   public void logSourceFiles(List<TsFileResource> sourceFiles) throws 
IOException {
-    this.sourceFiles.addAll(sourceFiles);
     logFiles(sourceFiles, STR_SOURCE_FILES);
   }
 
   public void logTargetFile(TsFileResource targetFile) throws IOException {
-    this.targetFile = targetFile;
     logFile(targetFile, STR_TARGET_FILES);
   }
 
+  public void logTargetFiles(List<TsFileResource> targetFiles) throws 
IOException {
+    logFiles(targetFiles, STR_TARGET_FILES);
+  }
+
   public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws 
IOException {
-    this.emptyTargetFile = emptyTargetFile;
     logFile(emptyTargetFile, STR_DELETED_TARGET_FILES);
   }
 }

Reply via email to