This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_recover_logger_1017 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1153fad0f881bf80742e64b7342cf6eba8583fff Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Oct 17 23:50:25 2023 +0800 add compaction task stage --- .../execute/task/AbstractCompactionTask.java | 7 +- .../execute/task/CrossSpaceCompactionTask.java | 45 +--- .../execute/task/InnerSpaceCompactionTask.java | 264 ++++++++++----------- .../execute/utils/log/CompactionLogAnalyzer.java | 17 +- .../execute/utils/log/CompactionLogger.java | 6 + ...pactionLogger.java => CompactionTaskStage.java} | 19 +- ...tionLogger.java => SimpleCompactionLogger.java} | 29 +-- 7 files changed, 189 insertions(+), 198 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index eb17f10f8c0..3be3ee32fd7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionTaskStage; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; @@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask { protected long serialId; protected boolean crossTask; protected boolean innerSeqTask; - + protected CompactionTaskStage taskStage; protected long memoryCost = 0L; protected boolean recoverMemoryStatus; @@ -312,6 +313,10 @@ public abstract class AbstractCompactionTask { } } + public void setTaskStage(CompactionTaskStage stage) { + this.taskStage = stage; + } + public boolean isTaskRan() { return summary.isRan(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index eaaf78df9c1..f03ab79f12c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; @@ -31,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; @@ -39,12 +39,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.tsfile.utils.TsFileUtils; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -121,6 +121,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { this.emptyTargetTsFileResourceList.add(targetTsFile); } } + this.taskStage = logAnalyzer.getTaskStage(); } @Override @@ -178,11 +179,11 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { + targetTsfileResourceList.get(0).getTsFile().getName() + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX); - try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) { + try (SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(logFile)) { // print the path of the temporary file first for priority check during recovery - compactionLogger.logFiles(selectedSequenceFiles, CompactionLogger.STR_SOURCE_FILES); - compactionLogger.logFiles(selectedUnsequenceFiles, CompactionLogger.STR_SOURCE_FILES); - compactionLogger.logFiles(targetTsfileResourceList, CompactionLogger.STR_TARGET_FILES); + compactionLogger.logSourceFiles(selectedSequenceFiles); + compactionLogger.logSourceFiles(selectedUnsequenceFiles); + compactionLogger.logTargetFiles(targetTsfileResourceList); compactionLogger.force(); performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles); @@ -209,7 +210,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { for (TsFileResource targetResource : targetTsfileResourceList) { if (targetResource.isDeleted()) { emptyTargetTsFileResourceList.add(targetResource); - compactionLogger.logFile(targetResource, CompactionLogger.STR_DELETED_TARGET_FILES); + compactionLogger.logEmptyTargetFile(targetResource); compactionLogger.force(); } } @@ -284,35 +285,11 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { (selectedSeqFileSize + selectedUnseqFileSize) / 1024.0d / 1024.0d / costTime), summary); } - if (logFile.exists()) { - FileUtils.delete(logFile); - } + Files.deleteIfExists(logFile.toPath()); } catch (Exception e) { isSuccess = false; - // catch throwable to handle OOM errors - if (!(e instanceof InterruptedException)) { - LOGGER.error( - "{}-{} [Compaction] Meet errors in cross space compaction.", - storageGroupName, - dataRegionId, - e); - } else { - LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); - // clean the interrupted flag - Thread.interrupted(); - } - - // handle exception - CompactionExceptionHandler.handleException( - storageGroupName + "-" + dataRegionId, - logFile, - targetTsfileResourceList, - selectedSequenceFiles, - selectedUnsequenceFiles, - tsFileManager, - timePartition, - false, - true); + printLogWhenException(LOGGER, e); + recover(); } finally { releaseAllLocks(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 8644b5a054c..180ba500877 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.InnerCompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; @@ -46,10 +46,9 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.iotdb.tsfile.utils.TsFileUtils; -import org.apache.commons.io.FileUtils; - import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -65,9 +64,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected int sumOfCompactionCount; protected long maxFileVersion; protected int maxCompactionCount; - private File logFile; - private InnerCompactionLogger compactionLogger; protected List<TsFileResource> targetTsFileList; protected boolean[] isHoldingWriteLock; protected long maxModsFileSize; @@ -115,6 +112,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { this.targetTsFileResource = new TsFileResource(targetFileOnDisk); } this.isTargetTsFileEmpty = deletedTargetFileIdentifiers.size() > 0; + this.taskStage = logAnalyzer.getTaskStage(); } public InnerSpaceCompactionTask( @@ -184,141 +182,141 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { targetTsFileResource = TsFileNameGenerator.getInnerCompactionTargetFileResource( selectedTsFileResourceList, sequence); - compactionLogger = new InnerCompactionLogger(logFile); - - // Here is tmpTargetFile, which is xxx.target - targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource)); - compactionLogger.logSourceFiles(selectedTsFileResourceList); - compactionLogger.logTargetFile(targetTsFileResource); - compactionLogger.force(); - LOGGER.info( - "{}-{} [Compaction] compaction with {}", - storageGroupName, - dataRegionId, - selectedTsFileResourceList); - - // carry out the compaction - performer.setSourceFiles(selectedTsFileResourceList); - // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use - // a - // mutable list instead of Collections.singletonList() - performer.setTargetFiles(targetTsFileList); - performer.setSummary(summary); - performer.perform(); - - CompactionUtils.updateProgressIndex( - targetTsFileList, selectedTsFileResourceList, Collections.emptyList()); - CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName + "-" + dataRegionId); - - LOGGER.info( - "{}-{} [InnerSpaceCompactionTask] start to rename mods file", - storageGroupName, - dataRegionId); - CompactionUtils.combineModsInInnerCompaction( - selectedTsFileResourceList, targetTsFileResource); - - if (Thread.currentThread().isInterrupted() || summary.isCancel()) { - throw new InterruptedException( - String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); - } - - // replace the old files with new file, the new is in same position as the old - if (sequence) { - tsFileManager.replace( - selectedTsFileResourceList, - Collections.emptyList(), - targetTsFileList, - timePartition, - true); - } else { - tsFileManager.replace( - Collections.emptyList(), - selectedTsFileResourceList, - targetTsFileList, - timePartition, - false); - } - - if (targetTsFileResource.isDeleted()) { - compactionLogger.logEmptyTargetFile(targetTsFileResource); - isTargetTsFileEmpty = true; + try (SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(logFile)) { + // Here is tmpTargetFile, which is xxx.target + targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource)); + compactionLogger.logSourceFiles(selectedTsFileResourceList); + compactionLogger.logTargetFile(targetTsFileResource); compactionLogger.force(); - } + LOGGER.info( + "{}-{} [Compaction] compaction with {}", + storageGroupName, + dataRegionId, + selectedTsFileResourceList); + + // carry out the compaction + performer.setSourceFiles(selectedTsFileResourceList); + // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use + // a + // mutable list instead of Collections.singletonList() + performer.setTargetFiles(targetTsFileList); + performer.setSummary(summary); + performer.perform(); + + CompactionUtils.updateProgressIndex( + targetTsFileList, selectedTsFileResourceList, Collections.emptyList()); + CompactionUtils.moveTargetFile( + targetTsFileList, true, storageGroupName + "-" + dataRegionId); + + LOGGER.info( + "{}-{} [InnerSpaceCompactionTask] start to rename mods file", + storageGroupName, + dataRegionId); + CompactionUtils.combineModsInInnerCompaction( + selectedTsFileResourceList, targetTsFileResource); + + if (Thread.currentThread().isInterrupted() || summary.isCancel()) { + throw new InterruptedException( + String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); + } - CompactionValidator validator = CompactionValidator.getInstance(); - if (!validator.validateCompaction( - tsFileManager, targetTsFileList, storageGroupName, timePartition, !sequence)) { - LOGGER.error( - "Failed to pass compaction validation, source files is: {}, target files is {}", - selectedTsFileResourceList, - targetTsFileList); - throw new CompactionValidationFailedException("Failed to pass compaction validation"); - } + // replace the old files with new file, the new is in same position as the old + if (sequence) { + tsFileManager.replace( + selectedTsFileResourceList, + Collections.emptyList(), + targetTsFileList, + timePartition, + true); + } else { + tsFileManager.replace( + Collections.emptyList(), + selectedTsFileResourceList, + targetTsFileList, + timePartition, + false); + } - LOGGER.info( - "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", - storageGroupName, - dataRegionId); + if (targetTsFileResource.isDeleted()) { + compactionLogger.logEmptyTargetFile(targetTsFileResource); + isTargetTsFileEmpty = true; + compactionLogger.force(); + } - // release the read lock of all source files, and get the write lock of them to delete them - for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { - selectedTsFileResourceList.get(i).writeLock(); - isHoldingWriteLock[i] = true; - } + CompactionValidator validator = CompactionValidator.getInstance(); + if (!validator.validateCompaction( + tsFileManager, targetTsFileList, storageGroupName, timePartition, !sequence)) { + LOGGER.error( + "Failed to pass compaction validation, source files is: {}, target files is {}", + selectedTsFileResourceList, + targetTsFileList); + throw new CompactionValidationFailedException("Failed to pass compaction validation"); + } - if (targetTsFileResource.getTsFile().exists() - && targetTsFileResource.getTsFile().length() - < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { - // the file size is smaller than magic string and version number - throw new TsFileNotCompleteException( - String.format( - "target file %s is smaller than magic string and version number size", - targetTsFileResource)); - } + LOGGER.info( + "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", + storageGroupName, + dataRegionId); + // release the read lock of all source files, and get the write lock of them to delete them + for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { + selectedTsFileResourceList.get(i).writeLock(); + isHoldingWriteLock[i] = true; + } - LOGGER.info( - "{}-{} [Compaction] compaction finish, start to delete old files", - storageGroupName, - dataRegionId); - CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList, sequence); - CompactionUtils.deleteModificationForSourceFile( - selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); - - // inner space compaction task has only one target file - if (!targetTsFileResource.isDeleted()) { - FileMetrics.getInstance() - .addTsFile( - targetTsFileResource.getDatabaseName(), - targetTsFileResource.getDataRegionId(), - targetTsFileResource.getTsFile().length(), - sequence, - targetTsFileResource.getTsFile().getName()); - - // set target resource to CLOSED, so that it can be selected to compact - targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL); - } else { - // target resource is empty after compaction, then delete it - targetTsFileResource.remove(); + if (targetTsFileResource.getTsFile().exists() + && targetTsFileResource.getTsFile().length() + < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { + // the file size is smaller than magic string and version number + throw new TsFileNotCompleteException( + String.format( + "target file %s is smaller than magic string and version number size", + targetTsFileResource)); + } + + LOGGER.info( + "{}-{} [Compaction] compaction finish, start to delete old files", + storageGroupName, + dataRegionId); + CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics( + selectedTsFileResourceList, sequence); + CompactionUtils.deleteModificationForSourceFile( + selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); + + // inner space compaction task has only one target file + if (!targetTsFileResource.isDeleted()) { + FileMetrics.getInstance() + .addTsFile( + targetTsFileResource.getDatabaseName(), + targetTsFileResource.getDataRegionId(), + targetTsFileResource.getTsFile().length(), + sequence, + targetTsFileResource.getTsFile().getName()); + + // set target resource to CLOSED, so that it can be selected to compact + targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL); + } else { + // target resource is empty after compaction, then delete it + targetTsFileResource.remove(); + } + CompactionMetrics.getInstance().recordSummaryInfo(summary); + + double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; + LOGGER.info( + "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, " + + "target file is {}," + + "time cost is {} s, " + + "compaction speed is {} MB/s, {}", + storageGroupName, + dataRegionId, + sequence ? "Sequence" : "Unsequence", + targetTsFileResource.getTsFile().getName(), + String.format("%.2f", costTime), + String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / costTime), + summary); + + // 在何时关闭 } - CompactionMetrics.getInstance().recordSummaryInfo(summary); - - double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; - LOGGER.info( - "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, " - + "target file is {}," - + "time cost is {} s, " - + "compaction speed is {} MB/s, {}", - storageGroupName, - dataRegionId, - sequence ? "Sequence" : "Unsequence", - targetTsFileResource.getTsFile().getName(), - String.format("%.2f", costTime), - String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / costTime), - summary); - - // 在何时关闭 - compactionLogger.close(); - FileUtils.delete(logFile); + Files.deleteIfExists(logFile.toPath()); } catch (Exception e) { isSuccess = false; printLogWhenException(LOGGER, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java index 031a7f95a7a..0baa7117392 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java @@ -25,6 +25,7 @@ import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES; import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES; @@ -36,6 +37,7 @@ public class CompactionLogAnalyzer { private final List<TsFileIdentifier> sourceFileInfos = new ArrayList<>(); private final List<TsFileIdentifier> targetFileInfos = new ArrayList<>(); private final List<TsFileIdentifier> deletedTargetFileInfos = new ArrayList<>(); + private CompactionTaskStage taskStage; private boolean isLogFromOld = false; public CompactionLogAnalyzer(File logFile) { @@ -48,9 +50,10 @@ public class CompactionLogAnalyzer { * @throws IOException if io errors occurred */ public void analyze() throws IOException { - String currLine; try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) { + String currLine; while ((currLine = bufferedReader.readLine()) != null) { + final String lineValue = currLine; String fileInfo; if (currLine.startsWith(STR_SOURCE_FILES)) { fileInfo = currLine.replaceFirst(STR_SOURCE_FILES + TsFileIdentifier.INFO_SEPARATOR, ""); @@ -58,10 +61,16 @@ public class CompactionLogAnalyzer { } else if (currLine.startsWith(STR_TARGET_FILES)) { fileInfo = currLine.replaceFirst(STR_TARGET_FILES + TsFileIdentifier.INFO_SEPARATOR, ""); targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo)); - } else { + } else if (currLine.startsWith(STR_DELETED_TARGET_FILES)) { fileInfo = currLine.replaceFirst(STR_DELETED_TARGET_FILES + TsFileIdentifier.INFO_SEPARATOR, ""); deletedTargetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo)); + } else if (Stream.of(CompactionTaskStage.values()) + .anyMatch(stage -> lineValue.startsWith(stage.name()))) { + taskStage = CompactionTaskStage.valueOf(currLine); + } else { + throw new IllegalArgumentException( + String.format("unknown compaction log line: %s", currLine)); } } } @@ -79,7 +88,7 @@ public class CompactionLogAnalyzer { return deletedTargetFileInfos; } - public boolean isLogFromOld() { - return isLogFromOld; + public CompactionTaskStage getTaskStage() { + return taskStage; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java index 248726e34e1..2ded4a27b73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java @@ -60,6 +60,12 @@ public class CompactionLogger implements AutoCloseable { } } + public void logTaskStage(CompactionTaskStage stage) throws IOException { + logStream.write(stage.name().getBytes()); + logStream.write(System.lineSeparator().getBytes()); + logStream.flush(); + } + public void logFile(TsFileResource tsFile, String flag) throws IOException { String log = flag diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java similarity index 63% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java index 15473f68e41..9a549a1790b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java @@ -19,8 +19,21 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log; -import java.util.List; +public enum CompactionTaskStage { + NOT_STARTED("task does not started"), + STARTED("task has started"), + TARGET_FILE_GENERATED("target file has been generated completely"), + TARGET_FILE_REPLACED("target file has been replaced in TsFileManager"), + SOURCE_FILE_CLEANED("source files are cleaned up"), + FINISHED("task finished"); -public interface ICompactionLogger { - List<String> getSourceFileList(); + private final String descirption; + + CompactionTaskStage(String descirption) { + this.descirption = descirption; + } + + public String getDescription() { + return this.descirption; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java similarity index 70% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java index e96b5104a10..d0769f5ba5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java @@ -23,45 +23,28 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; /** MergeLogger records the progress of a merge in file "merge.log" as text lines. */ -public class InnerCompactionLogger extends CompactionLogger { +public class SimpleCompactionLogger extends CompactionLogger { - private List<TsFileResource> sourceFiles; - private TsFileResource targetFile; - private TsFileResource emptyTargetFile; - - public InnerCompactionLogger(File logFile) throws IOException { + public SimpleCompactionLogger(File logFile) throws IOException { super(logFile); - this.sourceFiles = new ArrayList<>(); - } - - public List<TsFileResource> getSourceFiles() { - return sourceFiles; - } - - public TsFileResource getTargetFile() { - return targetFile; - } - - public TsFileResource getEmptyTargetFile() { - return emptyTargetFile; } public void logSourceFiles(List<TsFileResource> sourceFiles) throws IOException { - this.sourceFiles.addAll(sourceFiles); logFiles(sourceFiles, STR_SOURCE_FILES); } public void logTargetFile(TsFileResource targetFile) throws IOException { - this.targetFile = targetFile; logFile(targetFile, STR_TARGET_FILES); } + public void logTargetFiles(List<TsFileResource> targetFiles) throws IOException { + logFiles(targetFiles, STR_TARGET_FILES); + } + public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws IOException { - this.emptyTargetFile = emptyTargetFile; logFile(emptyTargetFile, STR_DELETED_TARGET_FILES); } }
