This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_recover_logger_1017 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7a8c70a2e35bda25e66776c34fe8930744cf7bb9 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Oct 17 17:23:34 2023 +0800 complete draft version for inner compaction refactor --- .../exception/CompactionRecoverException.java | 30 ++ .../execute/recover/CompactionRecoverManager.java | 20 - .../execute/recover/CompactionRecoverTask.java | 238 +--------- .../execute/task/AbstractCompactionTask.java | 16 + .../execute/task/CrossSpaceCompactionTask.java | 7 - .../execute/task/InnerSpaceCompactionTask.java | 519 ++++++++++++++------- .../compaction/execute/utils/CompactionUtils.java | 12 + .../execute/utils/log/CompactionLogAnalyzer.java | 95 ---- .../execute/utils/log/CompactionLogger.java | 16 +- .../execute/utils/log/ICompactionLogger.java | 26 ++ .../execute/utils/log/InnerCompactionLogger.java | 67 +++ .../execute/utils/log/TsFileIdentifier.java | 30 -- 12 files changed, 503 insertions(+), 573 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java new file mode 100644 index 00000000000..3c63e3d602d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception; + +public class CompactionRecoverException extends RuntimeException { + public CompactionRecoverException(String msg) { + super(msg); + } + + public CompactionRecoverException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java index 93576c2fecc..23e078a1f02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; -import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +60,11 @@ public class CompactionRecoverManager { public void recoverInnerSpaceCompaction(boolean isSequence) { logger.info("recovering inner compaction"); - recoverCompactionBefore013(true); recoverCompaction(true, isSequence); } public void recoverCrossSpaceCompaction() { logger.info("recovering cross compaction"); - recoverCompactionBefore013(false); recoverCompaction(false, true); } @@ -138,21 +135,4 @@ public class CompactionRecoverManager { .doCompaction(); } } - - /** Check whether there is old compaction log from previous version (<0.13) and recover it. */ - private void recoverCompactionBefore013(boolean isInnerSpace) { - String oldLogName = - isInnerSpace - ? logicalStorageGroupName + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD - : CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD; - File logFileFromOld = - FSFactoryProducer.getFSFactory().getFile(tsFileManager.getStorageGroupDir(), oldLogName); - - if (logFileFromOld.exists()) { - logger.info("Calling compaction task to recover from previous version."); - new CompactionRecoverTask( - logicalStorageGroupName, dataRegionId, tsFileManager, logFileFromOld, isInnerSpace) - .doCompaction(); - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java index ac53ad571d3..e04fd69b3c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java @@ -25,19 +25,12 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; -import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; -import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; -import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.TsFileUtils; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -82,16 +75,7 @@ public class CompactionRecoverTask { fullStorageGroupName, compactionLogFile); CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(compactionLogFile); - CompactionRecoverFromOld compactionRecoverFromOld = new CompactionRecoverFromOld(); - if (isInnerSpace && compactionRecoverFromOld.isInnerCompactionLogBefore013()) { - // inner compaction log from previous version (<0.13) - logAnalyzer.analyzeOldInnerCompactionLog(); - } else if (!isInnerSpace && compactionRecoverFromOld.isCrossCompactionLogBefore013()) { - // cross compaction log from previous version (<0.13) - logAnalyzer.analyzeOldCrossCompactionLog(); - } else { - logAnalyzer.analyze(); - } + logAnalyzer.analyze(); List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos(); List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos(); List<TsFileIdentifier> deletedTargetFileIdentifiers = @@ -115,24 +99,13 @@ public class CompactionRecoverTask { } if (isAllSourcesFileExisted) { - if (!isInnerSpace && logAnalyzer.isLogFromOld()) { - recoverSuccess = - compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistBefore013( - targetFileIdentifiers); - } else { - recoverSuccess = - handleWithAllSourceFilesExist(targetFileIdentifiers, sourceFileIdentifiers); - } + recoverSuccess = + handleWithAllSourceFilesExist(targetFileIdentifiers, sourceFileIdentifiers); + } else { - if (!isInnerSpace && logAnalyzer.isLogFromOld()) { - recoverSuccess = - compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostBefore013( - targetFileIdentifiers, sourceFileIdentifiers); - } else { - recoverSuccess = - handleWithSomeSourceFilesLost( - targetFileIdentifiers, deletedTargetFileIdentifiers, sourceFileIdentifiers); - } + recoverSuccess = + handleWithSomeSourceFilesLost( + targetFileIdentifiers, deletedTargetFileIdentifiers, sourceFileIdentifiers); } } } catch (IOException e) { @@ -358,201 +331,4 @@ public class CompactionRecoverTask { } return true; } - - /** - * Used to check whether it is recoverd from last version (<0.13) and perform corresponding - * process. - */ - private class CompactionRecoverFromOld { - - /** Return whether cross compaction log file is from previous version (<0.13). */ - private boolean isCrossCompactionLogBefore013() { - return compactionLogFile - .getName() - .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD); - } - - /** Return whether inner compaction log file is from previous version (<0.13). */ - private boolean isInnerCompactionLogBefore013() { - return compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName()); - } - - /** Delete tmp target file and compaction mods file. */ - private boolean handleCrossCompactionWithAllSourceFilesExistBefore013( - List<TsFileIdentifier> targetFileIdentifiers) { - // delete tmp target file - for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) { - // xxx.tsfile.merge - File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); - if (tmpTargetFile != null) { - try { - Files.delete(tmpTargetFile.toPath()); - } catch (IOException e) { - logger.error( - "{} [Compaction][Recover] failed to remove file {}, exception: {}", - fullStorageGroupName, - tmpTargetFile, - e); - } - File chunkMetadataTempFile = - new File( - tmpTargetFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX); - if (chunkMetadataTempFile.exists()) { - try { - Files.delete(chunkMetadataTempFile.toPath()); - } catch (IOException e) { - logger.error( - "{} [Compaction][Recover] failed to remove file {}, exception: {}", - fullStorageGroupName, - chunkMetadataTempFile, - e); - } - } - } - } - - // delete compaction mods file - File compactionModsFileFromOld = - new File( - tsFileManager.getStorageGroupDir() - + File.separator - + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD); - return checkAndDeleteFile(compactionModsFileFromOld); - } - - /** - * 1. If target file does not exist, then move .merge file to target file <br> - * 2. If target resource file does not exist, then serialize it. <br> - * 3. Append merging modification to target mods file and delete merging mods file. <br> - * 4. Delete source files and .merge file. <br> - */ - @SuppressWarnings("squid:S3776") - private boolean handleCrossCompactionWithSomeSourceFilesLostBefore013( - List<TsFileIdentifier> targetFileIdentifiers, - List<TsFileIdentifier> sourceFileIdentifiers) { - try { - File compactionModsFileFromOld = - new File( - tsFileManager.getStorageGroupDir() - + File.separator - + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD); - List<TsFileResource> targetFileResources = new ArrayList<>(); - for (int i = 0; i < sourceFileIdentifiers.size(); i++) { - TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i); - if (sourceFileIdentifier.isSequence()) { - File tmpTargetFile = targetFileIdentifiers.get(i).getFileFromDataDirs(); - File targetFile = null; - - // move tmp target file to target file if not exist - if (tmpTargetFile != null) { - // move tmp target file to target file - String sourceFilePath = - tmpTargetFile - .getPath() - .replace( - TsFileConstant.TSFILE_SUFFIX - + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD, - TsFileConstant.TSFILE_SUFFIX); - targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new File(sourceFilePath)); - FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, targetFile); - } else { - // target file must exist - File file = - TsFileNameGenerator.increaseCrossCompactionCnt( - new File( - targetFileIdentifiers - .get(i) - .getFilePath() - .replace( - TsFileConstant.TSFILE_SUFFIX - + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD, - TsFileConstant.TSFILE_SUFFIX))); - - targetFile = getFileFromDataDirs(file.getPath()); - } - if (targetFile == null) { - logger.error( - "{} [Compaction][Recover] target file of source seq file {} " - + "does not exist (<0.13).", - fullStorageGroupName, - sourceFileIdentifier.getFilePath()); - return false; - } - - // serialize target resource file if not exist - TsFileResource targetResource = new TsFileResource(targetFile); - if (!targetResource.resourceFileExists()) { - try (TsFileSequenceReader reader = - new TsFileSequenceReader(targetFile.getAbsolutePath())) { - FileLoaderUtils.updateTsFileResource(reader, targetResource); - } - targetResource.serialize(); - } - - targetFileResources.add(targetResource); - - // append compaction modifications to target mods file and delete compaction mods file - if (compactionModsFileFromOld.exists()) { - ModificationFile compactionModsFile = - new ModificationFile(compactionModsFileFromOld.getPath()); - appendCompactionModificationsBefore013(targetResource, compactionModsFile); - } - - // delete tmp target file - if (!checkAndDeleteFile(tmpTargetFile)) { - return false; - } - } - - // delete source tsfile - File sourceFile = sourceFileIdentifier.getFileFromDataDirs(); - if (!checkAndDeleteFile(sourceFile)) { - return false; - } - - // delete source resource file - sourceFile = - getFileFromDataDirs( - sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX); - if (!checkAndDeleteFile(sourceFile)) { - return false; - } - - // delete source mods file - sourceFile = - getFileFromDataDirs( - sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX); - if (!checkAndDeleteFile(sourceFile)) { - return false; - } - } - - // delete compaction mods file - if (!checkAndDeleteFile(compactionModsFileFromOld)) { - return false; - } - } catch (IOException e) { - logger.error( - "{} [Compaction][Recover] fail to handle with some source files lost from old version.", - fullStorageGroupName, - e); - return false; - } - - return true; - } - - private void appendCompactionModificationsBefore013( - TsFileResource resource, ModificationFile compactionModsFile) throws IOException { - if (compactionModsFile != null) { - for (Modification modification : compactionModsFile.getModifications()) { - // we have to set modification offset to MAX_VALUE, as the offset of source chunk may - // change after compaction - modification.setFileOffset(Long.MAX_VALUE); - resource.getModFile().write(modification); - } - resource.getModFile().close(); - } - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 02412723c48..7d9512780cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -28,6 +28,8 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.slf4j.Logger; + import java.io.IOException; import java.util.List; @@ -108,6 +110,20 @@ public abstract class AbstractCompactionTask { protected abstract boolean doCompaction(); + protected void printLogWhenException(Logger logger, Exception e) { + if (e instanceof InterruptedException) { + logger.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); + Thread.currentThread().interrupt(); + } else { + logger.error( + "{}-{} [Compaction] Meet errors {}.", + compactionTaskType, + storageGroupName, + dataRegionId, + e); + } + } + public boolean start() { boolean isSuccess = false; summary.start(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 4f9e23614ce..021e61389db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; @@ -49,8 +48,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); protected List<TsFileResource> selectedSequenceFiles; protected List<TsFileResource> selectedUnsequenceFiles; - protected TsFileResourceList seqTsFileResourceList; - protected TsFileResourceList unseqTsFileResourceList; private File logFile; protected List<TsFileResource> targetTsfileResourceList; protected List<TsFileResource> holdWriteLockList = new ArrayList<>(); @@ -74,10 +71,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { serialId); this.selectedSequenceFiles = selectedSequenceFiles; this.selectedUnsequenceFiles = selectedUnsequenceFiles; - this.seqTsFileResourceList = - tsFileManager.getOrCreateSequenceListByTimePartition(timePartition); - this.unseqTsFileResourceList = - tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition); this.performer = performer; this.hashCode = this.toString().hashCode(); this.memoryCost = memoryCost; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index ccde5bf6623..2dc544b4b57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -19,29 +19,36 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.InnerCompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.utils.TsFileUtils; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -60,21 +67,23 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected List<TsFileResource> selectedTsFileResourceList; protected TsFileResource targetTsFileResource; + protected boolean isTargetTsFileEmpty; protected boolean sequence; protected long selectedFileSize; protected int sumOfCompactionCount; protected long maxFileVersion; protected int maxCompactionCount; - private File logFile; - protected TsFileResourceList tsFileResourceList; + private File logFile; + private InnerCompactionLogger compactionLogger; protected List<TsFileResource> targetTsFileList; protected boolean[] isHoldingWriteLock; - protected long maxModsFileSize; - protected AbstractInnerSpaceEstimator innerSpaceEstimator; + protected boolean recoverMemoryStatus; + protected boolean needRecoverTaskInfoFromLogFile; + public InnerSpaceCompactionTask( long timePartition, TsFileManager tsFileManager, @@ -92,6 +101,30 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { CompactionTaskType.NORMAL); } + public InnerSpaceCompactionTask( + String databaseName, String dataRegionId, TsFileManager tsFileManager, File logFile) { + super(databaseName, dataRegionId, 0L, tsFileManager, 0L, CompactionTaskType.NORMAL); + this.logFile = logFile; + this.needRecoverTaskInfoFromLogFile = true; + } + + private void recoverTaskInfoFromLogFile() throws IOException { + CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(this.logFile); + logAnalyzer.analyze(); + List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos(); + List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos(); + List<TsFileIdentifier> deletedTargetFileIdentifiers = logAnalyzer.getDeletedTargetFileInfos(); + this.selectedTsFileResourceList = new ArrayList<>(); + sourceFileIdentifiers.forEach( + f -> this.selectedTsFileResourceList.add(new TsFileResource(f.getFileFromDataDirs()))); + if (targetFileIdentifiers.size() > 0) { + File targetFileOnDisk = getRealTargetFile(targetFileIdentifiers.get(0)); + // The targetFileOnDisk may be null, but it won't impact the task recover stage + this.targetTsFileResource = new TsFileResource(targetFileOnDisk); + } + this.isTargetTsFileEmpty = deletedTargetFileIdentifiers.size() > 0; + } + public InnerSpaceCompactionTask( long timePartition, TsFileManager tsFileManager, @@ -121,16 +154,22 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { isHoldingWriteLock[i] = false; } - if (sequence) { - tsFileResourceList = tsFileManager.getOrCreateSequenceListByTimePartition(timePartition); - } else { - tsFileResourceList = tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition); - } this.hashCode = this.toString().hashCode(); this.innerSeqTask = sequence; this.crossTask = false; collectSelectedFilesInfo(); createSummary(); + prepareLogFile(); + } + + private void prepareLogFile() { + String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent(); + logFile = + new File( + dataDirectory + + File.separator + + targetTsFileResource.getTsFile().getName() + + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); } @Override @@ -138,7 +177,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected boolean doCompaction() { long startTime = System.currentTimeMillis(); // get resource of target file - String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent(); + recoverMemoryStatus = true; LOGGER.info( "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, " + "total file size is {} MB.", @@ -153,190 +192,310 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { targetTsFileResource = TsFileNameGenerator.getInnerCompactionTargetFileResource( selectedTsFileResourceList, sequence); - logFile = - new File( - dataDirectory - + File.separator - + targetTsFileResource.getTsFile().getName() - + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); - try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) { - // Here is tmpTargetFile, which is xxx.target - targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource)); - compactionLogger.logFiles(selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES); - compactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES); - compactionLogger.force(); - LOGGER.info( - "{}-{} [Compaction] compaction with {}", - storageGroupName, - dataRegionId, - selectedTsFileResourceList); - - // carry out the compaction - performer.setSourceFiles(selectedTsFileResourceList); - // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use - // a - // mutable list instead of Collections.singletonList() - performer.setTargetFiles(targetTsFileList); - performer.setSummary(summary); - performer.perform(); - - CompactionUtils.updateProgressIndex( - targetTsFileList, selectedTsFileResourceList, Collections.emptyList()); - CompactionUtils.moveTargetFile( - targetTsFileList, true, storageGroupName + "-" + dataRegionId); - - LOGGER.info( - "{}-{} [InnerSpaceCompactionTask] start to rename mods file", - storageGroupName, - dataRegionId); - CompactionUtils.combineModsInInnerCompaction( - selectedTsFileResourceList, targetTsFileResource); - - if (Thread.currentThread().isInterrupted() || summary.isCancel()) { - throw new InterruptedException( - String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); - } - - // replace the old files with new file, the new is in same position as the old - if (sequence) { - tsFileManager.replace( - selectedTsFileResourceList, - Collections.emptyList(), - targetTsFileList, - timePartition, - true); - } else { - tsFileManager.replace( - Collections.emptyList(), - selectedTsFileResourceList, - targetTsFileList, - timePartition, - false); - } + compactionLogger = new InnerCompactionLogger(logFile); + + // Here is tmpTargetFile, which is xxx.target + targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource)); + compactionLogger.logSourceFiles(selectedTsFileResourceList); + compactionLogger.logTargetFile(targetTsFileResource); + compactionLogger.force(); + LOGGER.info( + "{}-{} [Compaction] compaction with {}", + storageGroupName, + dataRegionId, + selectedTsFileResourceList); + + // carry out the compaction + performer.setSourceFiles(selectedTsFileResourceList); + // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use + // a + // mutable list instead of Collections.singletonList() + performer.setTargetFiles(targetTsFileList); + performer.setSummary(summary); + performer.perform(); + + CompactionUtils.updateProgressIndex( + targetTsFileList, selectedTsFileResourceList, Collections.emptyList()); + CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName + "-" + dataRegionId); + + LOGGER.info( + "{}-{} [InnerSpaceCompactionTask] start to rename mods file", + storageGroupName, + dataRegionId); + CompactionUtils.combineModsInInnerCompaction( + selectedTsFileResourceList, targetTsFileResource); + + if (Thread.currentThread().isInterrupted() || summary.isCancel()) { + throw new InterruptedException( + String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); + } - if (targetTsFileResource.isDeleted()) { - compactionLogger.logFile(targetTsFileResource, CompactionLogger.STR_DELETED_TARGET_FILES); - compactionLogger.force(); - } + // replace the old files with new file, the new is in same position as the old + if (sequence) { + tsFileManager.replace( + selectedTsFileResourceList, + Collections.emptyList(), + targetTsFileList, + timePartition, + true); + } else { + tsFileManager.replace( + Collections.emptyList(), + selectedTsFileResourceList, + targetTsFileList, + timePartition, + false); + } - CompactionValidator validator = CompactionValidator.getInstance(); - if (!validator.validateCompaction( - tsFileManager, targetTsFileList, storageGroupName, timePartition, !sequence)) { - LOGGER.error( - "Failed to pass compaction validation, source files is: {}, target files is {}", - selectedTsFileResourceList, - targetTsFileList); - throw new CompactionValidationFailedException("Failed to pass compaction validation"); - } + if (targetTsFileResource.isDeleted()) { + compactionLogger.logEmptyTargetFile(targetTsFileResource); + isTargetTsFileEmpty = true; + compactionLogger.force(); + } - LOGGER.info( - "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", - storageGroupName, - dataRegionId); + CompactionValidator validator = CompactionValidator.getInstance(); + if (!validator.validateCompaction( + tsFileManager, targetTsFileList, storageGroupName, timePartition, !sequence)) { + LOGGER.error( + "Failed to pass compaction validation, source files is: {}, target files is {}", + selectedTsFileResourceList, + targetTsFileList); + throw new CompactionValidationFailedException("Failed to pass compaction validation"); + } - // release the read lock of all source files, and get the write lock of them to delete them - for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { - selectedTsFileResourceList.get(i).writeLock(); - isHoldingWriteLock[i] = true; - } + LOGGER.info( + "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", + storageGroupName, + dataRegionId); - if (targetTsFileResource.getTsFile().exists() - && targetTsFileResource.getTsFile().length() - < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { - // the file size is smaller than magic string and version number - throw new TsFileNotCompleteException( - String.format( - "target file %s is smaller than magic string and version number size", - targetTsFileResource)); - } + // release the read lock of all source files, and get the write lock of them to delete them + for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { + selectedTsFileResourceList.get(i).writeLock(); + isHoldingWriteLock[i] = true; + } - LOGGER.info( - "{}-{} [Compaction] compaction finish, start to delete old files", - storageGroupName, - dataRegionId); - CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics( - selectedTsFileResourceList, sequence); - CompactionUtils.deleteModificationForSourceFile( - selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); - - // inner space compaction task has only one target file - if (!targetTsFileResource.isDeleted()) { - FileMetrics.getInstance() - .addTsFile( - targetTsFileResource.getDatabaseName(), - targetTsFileResource.getDataRegionId(), - targetTsFileResource.getTsFile().length(), - sequence, - targetTsFileResource.getTsFile().getName()); - - // set target resource to CLOSED, so that it can be selected to compact - targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL); - } else { - // target resource is empty after compaction, then delete it - targetTsFileResource.remove(); - } - CompactionMetrics.getInstance().recordSummaryInfo(summary); - - double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; - LOGGER.info( - "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, " - + "target file is {}," - + "time cost is {} s, " - + "compaction speed is {} MB/s, {}", - storageGroupName, - dataRegionId, - sequence ? "Sequence" : "Unsequence", - targetTsFileResource.getTsFile().getName(), - String.format("%.2f", costTime), - String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / costTime), - summary); + if (targetTsFileResource.getTsFile().exists() + && targetTsFileResource.getTsFile().length() + < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) { + // the file size is smaller than magic string and version number + throw new TsFileNotCompleteException( + String.format( + "target file %s is smaller than magic string and version number size", + targetTsFileResource)); } - if (logFile.exists()) { - FileUtils.delete(logFile); + + LOGGER.info( + "{}-{} [Compaction] compaction finish, start to delete old files", + storageGroupName, + dataRegionId); + CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList, sequence); + CompactionUtils.deleteModificationForSourceFile( + selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); + + // inner space compaction task has only one target file + if (!targetTsFileResource.isDeleted()) { + FileMetrics.getInstance() + .addTsFile( + targetTsFileResource.getDatabaseName(), + targetTsFileResource.getDataRegionId(), + targetTsFileResource.getTsFile().length(), + sequence, + targetTsFileResource.getTsFile().getName()); + + // set target resource to CLOSED, so that it can be selected to compact + targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL); + } else { + // target resource is empty after compaction, then delete it + targetTsFileResource.remove(); } + CompactionMetrics.getInstance().recordSummaryInfo(summary); + + double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; + LOGGER.info( + "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, " + + "target file is {}," + + "time cost is {} s, " + + "compaction speed is {} MB/s, {}", + storageGroupName, + dataRegionId, + sequence ? "Sequence" : "Unsequence", + targetTsFileResource.getTsFile().getName(), + String.format("%.2f", costTime), + String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / costTime), + summary); + + // 在何时关闭 + compactionLogger.close(); + FileUtils.delete(logFile); } catch (Exception e) { isSuccess = false; - // catch throwable to handle OOM errors - if (!(e instanceof InterruptedException)) { - LOGGER.error( - "{}-{} [Compaction] Meet errors in inner space compaction.", - storageGroupName, - dataRegionId, - e); + printLogWhenException(LOGGER, e); + recover(); + } finally { + releaseAllLocks(); + } + return isSuccess; + } + + public void recover() { + try { + if (needRecoverTaskInfoFromLogFile) { + recoverTaskInfoFromLogFile(); + } + if (shouldRollback()) { + rollback(); } else { - // clean the interrupt flag - LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); - Thread.interrupted(); + // That finishTask() is revoked means + finishTask(); } + } catch (Exception e) { + handleRecoverException(e); + } + } - // handle exception - if (isSequence()) { - CompactionExceptionHandler.handleException( - storageGroupName + "-" + dataRegionId, - logFile, - targetTsFileList, - selectedTsFileResourceList, - Collections.emptyList(), - tsFileManager, - timePartition, - true, - isSequence()); - } else { - CompactionExceptionHandler.handleException( - storageGroupName + "-" + dataRegionId, - logFile, - targetTsFileList, - Collections.emptyList(), - selectedTsFileResourceList, - tsFileManager, - timePartition, - true, - isSequence()); + protected void handleRecoverException(Exception e) { + LOGGER.error( + "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {}, Exception: {}", + dataRegionId, + this, + e); + tsFileManager.setAllowCompaction(false); + LOGGER.error("stop compaction because of exception during recovering"); + // 考虑是否需要将系统设置为 READONLY + CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); + } + + private void removeTsFileInMemory(List<TsFileResource> resourceList) { + tsFileManager.writeLock("CompactionExceptionHandler"); + try { + for (TsFileResource targetTsFile : resourceList) { + if (targetTsFile == null) { + // target file has been deleted due to empty after compaction + continue; + } + tsFileManager.remove(targetTsFile, targetTsFile.isSeq()); } } finally { - releaseAllLocks(); + tsFileManager.writeUnlock(); } - return isSuccess; + } + + private void insertFilesToTsFileManager(List<TsFileResource> tsFiles) throws IOException { + for (TsFileResource tsFileResource : tsFiles) { + tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq()); + } + } + + private void rollback() throws Exception { + // if the task has started, + if (recoverMemoryStatus) { + removeTsFileInMemory(Collections.singletonList(targetTsFileResource)); + insertFilesToTsFileManager(selectedTsFileResourceList); + } + deleteCompactionModsFile(selectedTsFileResourceList); + // delete target file + if (targetTsFileResource != null) { + if (!deleteTsFileOnDisk(targetTsFileResource)) { + throw new CompactionRecoverException( + String.format("failed to delete target file %s", targetTsFileResource)); + } + } + } + + private void finishTask() throws IOException { + if (targetTsFileResource.isDeleted() || isTargetTsFileEmpty) { + // it means the target file is empty after compaction + if (targetTsFileResource.remove()) { + throw new CompactionRecoverException( + String.format("failed to delete empty target file %s", targetTsFileResource)); + } + } else { + File targetFile = targetTsFileResource.getTsFile(); + if (targetFile == null || !TsFileUtils.isTsFileComplete(targetTsFileResource.getTsFile())) { + throw new CompactionRecoverException( + String.format("Target file is not completed. %s", targetFile)); + } + if (recoverMemoryStatus) { + targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL); + } + } + if (!deleteTsFilesOnDisk(selectedTsFileResourceList)) { + throw new CompactionRecoverException("source files cannot be deleted successfully"); + } + if (recoverMemoryStatus) { + // 这个地方的统计可能是不准确的 + FileMetrics.getInstance().deleteTsFile(true, selectedTsFileResourceList); + } + deleteCompactionModsFile(selectedTsFileResourceList); + } + + /** + * This method find the File object of given filePath by searching it in every data directory. If + * the file is not found, it will return null. + */ + public File getFileFromDataDirs(String filePath) { + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs(); + for (String dataDir : dataDirs) { + File f = new File(dataDir, filePath); + if (f.exists()) { + return f; + } + } + return null; + } + + public File getRealTargetFile(TsFileIdentifier targetFileIdentifier) { + File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs(); + File targetFile = + getFileFromDataDirs( + targetFileIdentifier + .getFilePath() + .replace( + IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)); + return tmpTargetFile != null ? tmpTargetFile : targetFile; + } + + public void deleteCompactionModsFile(List<TsFileResource> tsFileResourceList) throws IOException { + for (TsFileResource seqFile : tsFileResourceList) { + ModificationFile modificationFile = seqFile.getCompactionModFile(); + if (modificationFile.exists()) { + modificationFile.remove(); + } + } + } + + private boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) { + for (TsFileResource resource : tsFiles) { + if (!deleteTsFileOnDisk(resource)) { + return false; + } + } + return true; + } + + private boolean deleteTsFileOnDisk(TsFileResource tsFileResource) { + tsFileResource.writeLock(); + try { + if (!tsFileResource.remove()) { + return false; + } + } finally { + targetTsFileResource.writeUnlock(); + } + return true; + } + + private boolean shouldRollback() { + return checkAllSourceFileExists(selectedTsFileResourceList); + } + + private boolean checkAllSourceFileExists(List<TsFileResource> tsFileResources) { + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.getTsFile().exists() || !tsFileResource.resourceFileExists()) { + return false; + } + } + return true; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index a733d80941d..d188905a614 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -212,6 +212,16 @@ public class CompactionUtils { } } + public static void deleteCompactionModsFile(List<TsFileResource> tsFileResourceList) + throws IOException { + for (TsFileResource seqFile : tsFileResourceList) { + ModificationFile modificationFile = seqFile.getCompactionModFile(); + if (modificationFile.exists()) { + modificationFile.remove(); + } + } + } + public static boolean deleteTsFilesInDisk( Collection<TsFileResource> mergeTsFiles, String storageGroupName) { logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName); @@ -449,6 +459,8 @@ public class CompactionUtils { deleteSourceTsFileAndUpdateFileMetrics(sourceUnseqResourceList, false); } + public static void deleteTsFiles(List<TsFileResource> tsfiles) {} + public static void deleteSourceTsFileAndUpdateFileMetrics( List<TsFileResource> resources, boolean seq) { List<TsFileResource> removeResources = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java index 61edea2d225..031a7f95a7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java @@ -19,9 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log; -import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; - import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -29,16 +26,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.SEQUENCE_NAME_FROM_OLD; import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_MERGE_START_FROM_OLD; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SEQ_FILES_FROM_OLD; import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES_FROM_OLD; import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES_FROM_OLD; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_UNSEQ_FILES_FROM_OLD; -import static org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.UNSEQUENCE_NAME_FROM_OLD; public class CompactionLogAnalyzer { @@ -77,91 +67,6 @@ public class CompactionLogAnalyzer { } } - /** - * Analyze inner space compaction log of previous version (<0.13). - * - * @throws IOException if io errors occurred - */ - public void analyzeOldInnerCompactionLog() throws IOException { - isLogFromOld = true; - String currLine; - try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) { - while ((currLine = bufferedReader.readLine()) != null) { - switch (currLine) { - case STR_SOURCE_FILES_FROM_OLD: - currLine = bufferedReader.readLine(); - sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine)); - break; - case STR_TARGET_FILES_FROM_OLD: - currLine = bufferedReader.readLine(); - targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine)); - break; - case STR_SOURCE_FILES: - currLine = bufferedReader.readLine(); - sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine)); - break; - case STR_TARGET_FILES: - currLine = bufferedReader.readLine(); - targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine)); - break; - case SEQUENCE_NAME_FROM_OLD: - case UNSEQUENCE_NAME_FROM_OLD: - break; - default: - break; - } - } - } - } - - /** - * Analyze cross space compaction log of previous version (<0.13). - * - * @throws IOException if io errors occurred - */ - @SuppressWarnings("squid:S135") - public void analyzeOldCrossCompactionLog() throws IOException { - isLogFromOld = true; - String currLine; - boolean isSeqSource = true; - try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) { - while ((currLine = bufferedReader.readLine()) != null) { - if (currLine.equals(STR_UNSEQ_FILES_FROM_OLD)) { - isSeqSource = false; - continue; - } else if (currLine.equals(STR_SEQ_FILES_FROM_OLD)) { - isSeqSource = true; - continue; - } else if (currLine.equals(STR_MERGE_START_FROM_OLD)) { - break; - } - analyzeOldFilePath(isSeqSource, currLine); - } - } - } - - private void analyzeOldFilePath(boolean isSeqSource, String oldFilePath) { - if (oldFilePath.startsWith("root")) { - sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(oldFilePath)); - } else { - sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(oldFilePath)); - } - - int pos = oldFilePath.lastIndexOf(TsFileConstant.TSFILE_SUFFIX); - if (isSeqSource) { - String targetFilePath = - oldFilePath.substring(0, pos) - + TsFileConstant.TSFILE_SUFFIX - + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD - + oldFilePath.substring(pos + TsFileConstant.TSFILE_SUFFIX.length()); - if (oldFilePath.startsWith("root")) { - targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(targetFilePath)); - } else { - targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(targetFilePath)); - } - } - } - public List<TsFileIdentifier> getSourceFileInfos() { return sourceFileInfos; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java index 9d02418b8ea..248726e34e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java @@ -30,26 +30,22 @@ import java.util.List; public class CompactionLogger implements AutoCloseable { public static final String CROSS_COMPACTION_LOG_NAME_SUFFIX = ".cross-compaction.log"; - public static final String CROSS_COMPACTION_LOG_NAME_FROM_OLD = "merge.log"; public static final String INNER_COMPACTION_LOG_NAME_SUFFIX = ".inner-compaction.log"; - public static final String INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD = ".compaction.log"; public static final String STR_SOURCE_FILES = "source"; public static final String STR_TARGET_FILES = "target"; - public static final String STR_DELETED_TARGET_FILES = "empty"; - public static final String STR_SOURCE_FILES_FROM_OLD = "info-source"; - public static final String STR_TARGET_FILES_FROM_OLD = "info-target"; - public static final String STR_SEQ_FILES_FROM_OLD = "seqFiles"; - public static final String STR_UNSEQ_FILES_FROM_OLD = "unseqFiles"; - public static final String SEQUENCE_NAME_FROM_OLD = "sequence"; - public static final String UNSEQUENCE_NAME_FROM_OLD = "unsequence"; - public static final String STR_MERGE_START_FROM_OLD = "merge start"; + protected List<TsFileIdentifier> sourceFileIdentifiers; + protected List<TsFileIdentifier> targetFileIdentifiers; + protected List<TsFileIdentifier> deletedTargetFileIdentifiers; + protected boolean needRecoverFromLogFile; + private File logFile; private FileOutputStream logStream; public CompactionLogger(File logFile) throws IOException { + this.logFile = logFile; logStream = new FileOutputStream(logFile, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java new file mode 100644 index 00000000000..15473f68e41 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log; + +import java.util.List; + +public interface ICompactionLogger { + List<String> getSourceFileList(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java new file mode 100644 index 00000000000..e96b5104a10 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** MergeLogger records the progress of a merge in file "merge.log" as text lines. */ +public class InnerCompactionLogger extends CompactionLogger { + + private List<TsFileResource> sourceFiles; + private TsFileResource targetFile; + private TsFileResource emptyTargetFile; + + public InnerCompactionLogger(File logFile) throws IOException { + super(logFile); + this.sourceFiles = new ArrayList<>(); + } + + public List<TsFileResource> getSourceFiles() { + return sourceFiles; + } + + public TsFileResource getTargetFile() { + return targetFile; + } + + public TsFileResource getEmptyTargetFile() { + return emptyTargetFile; + } + + public void logSourceFiles(List<TsFileResource> sourceFiles) throws IOException { + this.sourceFiles.addAll(sourceFiles); + logFiles(sourceFiles, STR_SOURCE_FILES); + } + + public void logTargetFile(TsFileResource targetFile) throws IOException { + this.targetFile = targetFile; + logFile(targetFile, STR_TARGET_FILES); + } + + public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws IOException { + this.emptyTargetFile = emptyTargetFile; + logFile(emptyTargetFile, STR_DELETED_TARGET_FILES); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java index e1ac67bd8ee..6b3b5fdddd7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java @@ -51,12 +51,6 @@ public class TsFileIdentifier { public static final int TIME_PARTITION_OFFSET_IN_LOG = 3; public static final int FILE_NAME_OFFSET_IN_LOG = 4; - private static final int LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD = 0; - private static final int DATA_REGION_OFFSET_IN_LOG_FROM_OLD = 1; - private static final int TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD = 2; - private static final int FILE_NAME_OFFSET_IN_LOG_FROM_OLD = 3; - private static final int SEQUENCE_OFFSET_IN_LOG_FROM_OLD = 4; - private static final String SEQUENCE_STR = "sequence"; private static final String UNSEQUENCE_STR = "unsequence"; @@ -123,30 +117,6 @@ public class TsFileIdentifier { splittedFileInfo[FILE_NAME_OFFSET_IN_LOG]); } - /** - * This function generates an instance of CompactionFileIdentifier by parsing the old info string - * from previous version (<0.13) of a tsfile(usually recorded in a compaction.log). Such as - * “root.test.sg 0 0 1-1-0-0.tsfile true" from old cross space compaction log and "root.test.sg 0 - * 0 1-1-0-0.tsfile sequence" from old inner space compaction log. - */ - public static TsFileIdentifier getFileIdentifierFromOldInfoString(String oldInfoString) { - String[] splittedFileInfo = oldInfoString.split(INFO_SEPARATOR); - int length = splittedFileInfo.length; - if (length != 5) { - throw new RuntimeException( - String.format( - "String %s is not a legal file info string from previous version (<0.13)", - oldInfoString)); - } - return new TsFileIdentifier( - splittedFileInfo[LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD], - splittedFileInfo[DATA_REGION_OFFSET_IN_LOG_FROM_OLD], - splittedFileInfo[TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD], - splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals("true") - || splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals(SEQUENCE_STR), - splittedFileInfo[FILE_NAME_OFFSET_IN_LOG_FROM_OLD]); - } - @Override public String toString() { return String.format(
