This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_recover_logger_1017 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a83a61097b0e70937dcd49e34c69850408cc6c38 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Oct 17 18:25:30 2023 +0800 complete cross refactor --- .../execute/task/AbstractCompactionTask.java | 1 + .../execute/task/CrossSpaceCompactionTask.java | 114 +++++++++++++++++++++ .../execute/task/InnerSpaceCompactionTask.java | 2 - 3 files changed, 115 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 8e378b884c6..18661335876 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -66,6 +66,7 @@ public abstract class AbstractCompactionTask { protected long memoryCost = 0L; + protected boolean recoverMemoryStatus; protected CompactionTaskType compactionTaskType; protected AbstractCompactionTask( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 021e61389db..83af4af0c5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -23,23 +23,28 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.tsfile.utils.TsFileUtils; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -50,10 +55,13 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { protected List<TsFileResource> selectedUnsequenceFiles; private File logFile; protected List<TsFileResource> targetTsfileResourceList; + private List<TsFileResource> emptyTargetTsFileResourceList; protected List<TsFileResource> holdWriteLockList = new ArrayList<>(); protected double selectedSeqFileSize = 0; protected double selectedUnseqFileSize = 0; + protected boolean needRecoverTaskInfoFromLogFile; + @SuppressWarnings("squid:S107") public CrossSpaceCompactionTask( long timePartition, @@ -71,6 +79,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { serialId); this.selectedSequenceFiles = selectedSequenceFiles; this.selectedUnsequenceFiles = selectedUnsequenceFiles; + this.emptyTargetTsFileResourceList = new ArrayList<>(); this.performer = performer; this.hashCode = this.toString().hashCode(); this.memoryCost = memoryCost; @@ -79,9 +88,43 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { createSummary(); } + public CrossSpaceCompactionTask( + String databaseName, String dataRegionId, TsFileManager tsFileManager, File logFile) { + super(databaseName, dataRegionId, 0L, tsFileManager, 0L, CompactionTaskType.NORMAL); + this.logFile = logFile; + this.needRecoverTaskInfoFromLogFile = true; + } + + private void recoverTaskInfoFromLogFile() throws IOException { + CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(this.logFile); + logAnalyzer.analyze(); + List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos(); + List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos(); + List<TsFileIdentifier> deletedTargetFileIdentifiers = logAnalyzer.getDeletedTargetFileInfos(); + this.selectedSequenceFiles = new ArrayList<>(); + sourceFileIdentifiers.stream() + .filter(TsFileIdentifier::isSequence) + .forEach(f -> this.selectedSequenceFiles.add(new TsFileResource(f.getFileFromDataDirs()))); + sourceFileIdentifiers.stream() + .filter(f -> !f.isSequence()) + .forEach( + f -> this.selectedUnsequenceFiles.add(new TsFileResource(f.getFileFromDataDirs()))); + + for (TsFileIdentifier f : targetFileIdentifiers) { + File targetFileOnDisk = getRealTargetFile(f, IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX); + // The targetFileOnDisk may be null, but it won't impact the task recover stage + TsFileResource targetTsFile = new TsFileResource(targetFileOnDisk); + this.targetTsfileResourceList.add(targetTsFile); + if (deletedTargetFileIdentifiers.contains(f)) { + this.emptyTargetTsFileResourceList.add(targetTsFile); + } + } + } + @Override @SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"}) public boolean doCompaction() { + recoverMemoryStatus = true; boolean isSuccess = true; try { if (!tsFileManager.isAllowCompaction()) { @@ -163,6 +206,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { // find empty target files and add log for (TsFileResource targetResource : targetTsfileResourceList) { if (targetResource.isDeleted()) { + emptyTargetTsFileResourceList.add(targetResource); compactionLogger.logFile(targetResource, CompactionLogger.STR_DELETED_TARGET_FILES); compactionLogger.force(); } @@ -273,6 +317,76 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { return isSuccess; } + public void recover() { + try { + if (needRecoverTaskInfoFromLogFile) { + recoverTaskInfoFromLogFile(); + } + if (shouldRollback()) { + rollback(); + } else { + // That finishTask() is revoked means + finishTask(); + } + } catch (Exception e) { + handleRecoverException(e); + } + } + + private boolean shouldRollback() { + return checkAllSourceFileExists(selectedSequenceFiles) + && checkAllSourceFileExists(selectedUnsequenceFiles); + } + + private void rollback() throws Exception { + // if the task has started, + if (recoverMemoryStatus) { + removeTsFileInMemory(targetTsfileResourceList); + insertFilesToTsFileManager(selectedSequenceFiles); + insertFilesToTsFileManager(selectedUnsequenceFiles); + } + deleteCompactionModsFile(selectedSequenceFiles); + deleteCompactionModsFile(selectedUnsequenceFiles); + // delete target file + if (targetTsfileResourceList != null) { + if (!deleteTsFilesOnDisk(targetTsfileResourceList)) { + throw new CompactionRecoverException("failed to delete target file %s"); + } + } + } + + private void finishTask() throws IOException { + for (TsFileResource target : targetTsfileResourceList) { + if (target.isDeleted() || emptyTargetTsFileResourceList.contains(target)) { + // it means the target file is empty after compaction + if (target.remove()) { + throw new CompactionRecoverException( + String.format("failed to delete empty target file %s", target)); + } + } else { + File targetFile = target.getTsFile(); + if (targetFile == null || !TsFileUtils.isTsFileComplete(target.getTsFile())) { + throw new CompactionRecoverException( + String.format("Target file is not completed. %s", targetFile)); + } + if (recoverMemoryStatus) { + target.setStatus(TsFileResourceStatus.NORMAL); + } + } + } + if (!deleteTsFilesOnDisk(selectedSequenceFiles) + || !deleteTsFilesOnDisk(selectedUnsequenceFiles)) { + throw new CompactionRecoverException("source files cannot be deleted successfully"); + } + if (recoverMemoryStatus) { + // 这个地方的统计可能是不准确的 + FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles); + FileMetrics.getInstance().deleteTsFile(true, selectedUnsequenceFiles); + } + deleteCompactionModsFile(selectedSequenceFiles); + deleteCompactionModsFile(selectedUnsequenceFiles); + } + @Override public boolean equalsOtherTask(AbstractCompactionTask otherTask) { if (!(otherTask instanceof CrossSpaceCompactionTask)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 47575d8ea11..8f48cbbdf88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -72,8 +72,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected boolean[] isHoldingWriteLock; protected long maxModsFileSize; protected AbstractInnerSpaceEstimator innerSpaceEstimator; - - protected boolean recoverMemoryStatus; protected boolean needRecoverTaskInfoFromLogFile; public InnerSpaceCompactionTask(
