This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_recover_logger_1017 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5308e8c309f2357761dcef571135efb8ed41b080 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Oct 17 17:49:23 2023 +0800 move some common method to abstract class --- .../execute/task/AbstractCompactionTask.java | 107 ++++++++++++++++++++ .../execute/task/InnerSpaceCompactionTask.java | 110 +-------------------- 2 files changed, 110 insertions(+), 107 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 7d9512780cd..8e378b884c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -19,17 +19,26 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.List; @@ -41,6 +50,9 @@ import java.util.List; * finished. The future returns the {@link CompactionTaskSummary} of this task execution. */ public abstract class AbstractCompactionTask { + protected static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + protected String dataRegionId; protected String storageGroupName; protected long timePartition; @@ -110,6 +122,8 @@ public abstract class AbstractCompactionTask { protected abstract boolean doCompaction(); + protected abstract void recover(); + protected void printLogWhenException(Logger logger, Exception e) { if (e instanceof InterruptedException) { logger.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); @@ -198,6 +212,99 @@ public abstract class AbstractCompactionTask { } } + protected boolean checkAllSourceFileExists(List<TsFileResource> tsFileResources) { + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.getTsFile().exists() || !tsFileResource.resourceFileExists()) { + return false; + } + } + return true; + } + + protected void handleRecoverException(Exception e) { + LOGGER.error( + "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {}, Exception: {}", + dataRegionId, + this, + e); + tsFileManager.setAllowCompaction(false); + LOGGER.error("stop compaction because of exception during recovering"); + // 考虑是否需要将系统设置为 READONLY + CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); + } + + protected void insertFilesToTsFileManager(List<TsFileResource> tsFiles) throws IOException { + for (TsFileResource tsFileResource : tsFiles) { + tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq()); + } + } + + protected void removeTsFileInMemory(List<TsFileResource> resourceList) { + tsFileManager.writeLock("CompactionExceptionHandler"); + try { + for (TsFileResource targetTsFile : resourceList) { + if (targetTsFile == null) { + // target file has been deleted due to empty after compaction + continue; + } + tsFileManager.remove(targetTsFile, targetTsFile.isSeq()); + } + } finally { + tsFileManager.writeUnlock(); + } + } + + public File getRealTargetFile(TsFileIdentifier targetFileIdentifier, String suffix) { + File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); + File targetFile = + getFileFromDataDirs( + targetFileIdentifier.getFilePath().replace(suffix, TsFileConstant.TSFILE_SUFFIX)); + return tmpTargetFile != null ? tmpTargetFile : targetFile; + } + + /** + * This method find the File object of given filePath by searching it in every data directory. If + * the file is not found, it will return null. + */ + public File getFileFromDataDirs(String filePath) { + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs(); + for (String dataDir : dataDirs) { + File f = new File(dataDir, filePath); + if (f.exists()) { + return f; + } + } + return null; + } + + protected void deleteCompactionModsFile(List<TsFileResource> tsFileResourceList) + throws IOException { + for (TsFileResource seqFile : tsFileResourceList) { + ModificationFile modificationFile = seqFile.getCompactionModFile(); + if (modificationFile.exists()) { + modificationFile.remove(); + } + } + } + + protected boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) { + for (TsFileResource resource : tsFiles) { + if (!deleteTsFileOnDisk(resource)) { + return false; + } + } + return true; + } + + protected boolean deleteTsFileOnDisk(TsFileResource tsFileResource) { + tsFileResource.writeLock(); + try { + return tsFileResource.remove(); + } finally { + tsFileResource.writeUnlock(); + } + } + public boolean isTaskRan() { return summary.isRan(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 2dc544b4b57..47575d8ea11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; -import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; @@ -40,19 +38,15 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.val import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.iotdb.tsfile.utils.TsFileUtils; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -62,8 +56,6 @@ import java.util.List; import java.util.Objects; public class InnerSpaceCompactionTask extends AbstractCompactionTask { - private static final Logger LOGGER = - LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); protected List<TsFileResource> selectedTsFileResourceList; protected TsFileResource targetTsFileResource; @@ -118,7 +110,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { sourceFileIdentifiers.forEach( f -> this.selectedTsFileResourceList.add(new TsFileResource(f.getFileFromDataDirs()))); if (targetFileIdentifiers.size() > 0) { - File targetFileOnDisk = getRealTargetFile(targetFileIdentifiers.get(0)); + File targetFileOnDisk = + getRealTargetFile( + targetFileIdentifiers.get(0), IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX); // The targetFileOnDisk may be null, but it won't impact the task recover stage this.targetTsFileResource = new TsFileResource(targetFileOnDisk); } @@ -353,39 +347,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { } } - protected void handleRecoverException(Exception e) { - LOGGER.error( - "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {}, Exception: {}", - dataRegionId, - this, - e); - tsFileManager.setAllowCompaction(false); - LOGGER.error("stop compaction because of exception during recovering"); - // 考虑是否需要将系统设置为 READONLY - CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); - } - - private void removeTsFileInMemory(List<TsFileResource> resourceList) { - tsFileManager.writeLock("CompactionExceptionHandler"); - try { - for (TsFileResource targetTsFile : resourceList) { - if (targetTsFile == null) { - // target file has been deleted due to empty after compaction - continue; - } - tsFileManager.remove(targetTsFile, targetTsFile.isSeq()); - } - } finally { - tsFileManager.writeUnlock(); - } - } - - private void insertFilesToTsFileManager(List<TsFileResource> tsFiles) throws IOException { - for (TsFileResource tsFileResource : tsFiles) { - tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq()); - } - } - private void rollback() throws Exception { // if the task has started, if (recoverMemoryStatus) { @@ -429,75 +390,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { deleteCompactionModsFile(selectedTsFileResourceList); } - /** - * This method find the File object of given filePath by searching it in every data directory. If - * the file is not found, it will return null. - */ - public File getFileFromDataDirs(String filePath) { - String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs(); - for (String dataDir : dataDirs) { - File f = new File(dataDir, filePath); - if (f.exists()) { - return f; - } - } - return null; - } - - public File getRealTargetFile(TsFileIdentifier targetFileIdentifier) { - File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); - File targetFile = - getFileFromDataDirs( - targetFileIdentifier - .getFilePath() - .replace( - IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)); - return tmpTargetFile != null ? tmpTargetFile : targetFile; - } - - public void deleteCompactionModsFile(List<TsFileResource> tsFileResourceList) throws IOException { - for (TsFileResource seqFile : tsFileResourceList) { - ModificationFile modificationFile = seqFile.getCompactionModFile(); - if (modificationFile.exists()) { - modificationFile.remove(); - } - } - } - - private boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) { - for (TsFileResource resource : tsFiles) { - if (!deleteTsFileOnDisk(resource)) { - return false; - } - } - return true; - } - - private boolean deleteTsFileOnDisk(TsFileResource tsFileResource) { - tsFileResource.writeLock(); - try { - if (!tsFileResource.remove()) { - return false; - } - } finally { - targetTsFileResource.writeUnlock(); - } - return true; - } - private boolean shouldRollback() { return checkAllSourceFileExists(selectedTsFileResourceList); } - private boolean checkAllSourceFileExists(List<TsFileResource> tsFileResources) { - for (TsFileResource tsFileResource : tsFileResources) { - if (!tsFileResource.getTsFile().exists() || !tsFileResource.resourceFileExists()) { - return false; - } - } - return true; - } - @Override public boolean equalsOtherTask(AbstractCompactionTask otherTask) { if (!(otherTask instanceof InnerSpaceCompactionTask)) {
