This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/compaction_recover_logger_1017
by this push:
new a0609d3cc2e add compaction task stage
a0609d3cc2e is described below
commit a0609d3cc2e185ff979e9d8292610b0eb6dec951
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Oct 17 23:50:25 2023 +0800
add compaction task stage
---
.../execute/task/AbstractCompactionTask.java | 7 +-
.../execute/task/CrossSpaceCompactionTask.java | 45 +---
.../execute/task/InnerSpaceCompactionTask.java | 264 ++++++++++-----------
.../execute/utils/log/CompactionLogAnalyzer.java | 17 +-
.../execute/utils/log/CompactionLogger.java | 6 +
...pactionLogger.java => CompactionTaskStage.java} | 19 +-
...tionLogger.java => SimpleCompactionLogger.java} | 29 +--
7 files changed, 189 insertions(+), 198 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index eb17f10f8c0..3be3ee32fd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionTaskStage;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask {
protected long serialId;
protected boolean crossTask;
protected boolean innerSeqTask;
-
+ protected CompactionTaskStage taskStage;
protected long memoryCost = 0L;
protected boolean recoverMemoryStatus;
@@ -312,6 +313,10 @@ public abstract class AbstractCompactionTask {
}
}
+ public void setTaskStage(CompactionTaskStage stage) {
+ this.taskStage = stage;
+ }
+
public boolean isTaskRan() {
return summary.isRan();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index eaaf78df9c1..f03ab79f12c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
@@ -31,6 +30,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -39,12 +39,12 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -121,6 +121,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
this.emptyTargetTsFileResourceList.add(targetTsFile);
}
}
+ this.taskStage = logAnalyzer.getTaskStage();
}
@Override
@@ -178,11 +179,11 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
+ targetTsfileResourceList.get(0).getTsFile().getName()
+ CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
- try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) {
+ try (SimpleCompactionLogger compactionLogger = new
SimpleCompactionLogger(logFile)) {
// print the path of the temporary file first for priority check
during recovery
- compactionLogger.logFiles(selectedSequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(selectedUnsequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(targetTsfileResourceList,
CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logSourceFiles(selectedSequenceFiles);
+ compactionLogger.logSourceFiles(selectedUnsequenceFiles);
+ compactionLogger.logTargetFiles(targetTsfileResourceList);
compactionLogger.force();
performer.setSourceFiles(selectedSequenceFiles,
selectedUnsequenceFiles);
@@ -209,7 +210,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
for (TsFileResource targetResource : targetTsfileResourceList) {
if (targetResource.isDeleted()) {
emptyTargetTsFileResourceList.add(targetResource);
- compactionLogger.logFile(targetResource,
CompactionLogger.STR_DELETED_TARGET_FILES);
+ compactionLogger.logEmptyTargetFile(targetResource);
compactionLogger.force();
}
}
@@ -284,35 +285,11 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
(selectedSeqFileSize + selectedUnseqFileSize) / 1024.0d /
1024.0d / costTime),
summary);
}
- if (logFile.exists()) {
- FileUtils.delete(logFile);
- }
+ Files.deleteIfExists(logFile.toPath());
} catch (Exception e) {
isSuccess = false;
- // catch throwable to handle OOM errors
- if (!(e instanceof InterruptedException)) {
- LOGGER.error(
- "{}-{} [Compaction] Meet errors in cross space compaction.",
- storageGroupName,
- dataRegionId,
- e);
- } else {
- LOGGER.warn("{}-{} [Compaction] Compaction interrupted",
storageGroupName, dataRegionId);
- // clean the interrupted flag
- Thread.interrupted();
- }
-
- // handle exception
- CompactionExceptionHandler.handleException(
- storageGroupName + "-" + dataRegionId,
- logFile,
- targetTsfileResourceList,
- selectedSequenceFiles,
- selectedUnsequenceFiles,
- tsFileManager,
- timePartition,
- false,
- true);
+ printLogWhenException(LOGGER, e);
+ recover();
} finally {
releaseAllLocks();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 8644b5a054c..180ba500877 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -32,7 +32,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subt
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.InnerCompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
@@ -46,10 +46,9 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
-import org.apache.commons.io.FileUtils;
-
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -65,9 +64,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
protected int sumOfCompactionCount;
protected long maxFileVersion;
protected int maxCompactionCount;
-
private File logFile;
- private InnerCompactionLogger compactionLogger;
protected List<TsFileResource> targetTsFileList;
protected boolean[] isHoldingWriteLock;
protected long maxModsFileSize;
@@ -115,6 +112,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
this.targetTsFileResource = new TsFileResource(targetFileOnDisk);
}
this.isTargetTsFileEmpty = deletedTargetFileIdentifiers.size() > 0;
+ this.taskStage = logAnalyzer.getTaskStage();
}
public InnerSpaceCompactionTask(
@@ -184,141 +182,141 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
selectedTsFileResourceList, sequence);
- compactionLogger = new InnerCompactionLogger(logFile);
-
- // Here is tmpTargetFile, which is xxx.target
- targetTsFileList = new
ArrayList<>(Collections.singletonList(targetTsFileResource));
- compactionLogger.logSourceFiles(selectedTsFileResourceList);
- compactionLogger.logTargetFile(targetTsFileResource);
- compactionLogger.force();
- LOGGER.info(
- "{}-{} [Compaction] compaction with {}",
- storageGroupName,
- dataRegionId,
- selectedTsFileResourceList);
-
- // carry out the compaction
- performer.setSourceFiles(selectedTsFileResourceList);
- // As elements in targetFiles may be removed in
ReadPointCompactionPerformer, we should use
- // a
- // mutable list instead of Collections.singletonList()
- performer.setTargetFiles(targetTsFileList);
- performer.setSummary(summary);
- performer.perform();
-
- CompactionUtils.updateProgressIndex(
- targetTsFileList, selectedTsFileResourceList,
Collections.emptyList());
- CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName
+ "-" + dataRegionId);
-
- LOGGER.info(
- "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
- storageGroupName,
- dataRegionId);
- CompactionUtils.combineModsInInnerCompaction(
- selectedTsFileResourceList, targetTsFileResource);
-
- if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
- throw new InterruptedException(
- String.format("%s-%s [Compaction] abort", storageGroupName,
dataRegionId));
- }
-
- // replace the old files with new file, the new is in same position as
the old
- if (sequence) {
- tsFileManager.replace(
- selectedTsFileResourceList,
- Collections.emptyList(),
- targetTsFileList,
- timePartition,
- true);
- } else {
- tsFileManager.replace(
- Collections.emptyList(),
- selectedTsFileResourceList,
- targetTsFileList,
- timePartition,
- false);
- }
-
- if (targetTsFileResource.isDeleted()) {
- compactionLogger.logEmptyTargetFile(targetTsFileResource);
- isTargetTsFileEmpty = true;
+ try (SimpleCompactionLogger compactionLogger = new
SimpleCompactionLogger(logFile)) {
+ // Here is tmpTargetFile, which is xxx.target
+ targetTsFileList = new
ArrayList<>(Collections.singletonList(targetTsFileResource));
+ compactionLogger.logSourceFiles(selectedTsFileResourceList);
+ compactionLogger.logTargetFile(targetTsFileResource);
compactionLogger.force();
- }
+ LOGGER.info(
+ "{}-{} [Compaction] compaction with {}",
+ storageGroupName,
+ dataRegionId,
+ selectedTsFileResourceList);
+
+ // carry out the compaction
+ performer.setSourceFiles(selectedTsFileResourceList);
+ // As elements in targetFiles may be removed in
ReadPointCompactionPerformer, we should use
+ // a
+ // mutable list instead of Collections.singletonList()
+ performer.setTargetFiles(targetTsFileList);
+ performer.setSummary(summary);
+ performer.perform();
+
+ CompactionUtils.updateProgressIndex(
+ targetTsFileList, selectedTsFileResourceList,
Collections.emptyList());
+ CompactionUtils.moveTargetFile(
+ targetTsFileList, true, storageGroupName + "-" + dataRegionId);
+
+ LOGGER.info(
+ "{}-{} [InnerSpaceCompactionTask] start to rename mods file",
+ storageGroupName,
+ dataRegionId);
+ CompactionUtils.combineModsInInnerCompaction(
+ selectedTsFileResourceList, targetTsFileResource);
+
+ if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
+ throw new InterruptedException(
+ String.format("%s-%s [Compaction] abort", storageGroupName,
dataRegionId));
+ }
- CompactionValidator validator = CompactionValidator.getInstance();
- if (!validator.validateCompaction(
- tsFileManager, targetTsFileList, storageGroupName, timePartition,
!sequence)) {
- LOGGER.error(
- "Failed to pass compaction validation, source files is: {}, target
files is {}",
- selectedTsFileResourceList,
- targetTsFileList);
- throw new CompactionValidationFailedException("Failed to pass
compaction validation");
- }
+ // replace the old files with new file, the new is in same position as
the old
+ if (sequence) {
+ tsFileManager.replace(
+ selectedTsFileResourceList,
+ Collections.emptyList(),
+ targetTsFileList,
+ timePartition,
+ true);
+ } else {
+ tsFileManager.replace(
+ Collections.emptyList(),
+ selectedTsFileResourceList,
+ targetTsFileList,
+ timePartition,
+ false);
+ }
- LOGGER.info(
- "{}-{} [Compaction] Compacted target files, try to get the write
lock of source files",
- storageGroupName,
- dataRegionId);
+ if (targetTsFileResource.isDeleted()) {
+ compactionLogger.logEmptyTargetFile(targetTsFileResource);
+ isTargetTsFileEmpty = true;
+ compactionLogger.force();
+ }
- // release the read lock of all source files, and get the write lock of
them to delete them
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- selectedTsFileResourceList.get(i).writeLock();
- isHoldingWriteLock[i] = true;
- }
+ CompactionValidator validator = CompactionValidator.getInstance();
+ if (!validator.validateCompaction(
+ tsFileManager, targetTsFileList, storageGroupName, timePartition,
!sequence)) {
+ LOGGER.error(
+ "Failed to pass compaction validation, source files is: {},
target files is {}",
+ selectedTsFileResourceList,
+ targetTsFileList);
+ throw new CompactionValidationFailedException("Failed to pass
compaction validation");
+ }
- if (targetTsFileResource.getTsFile().exists()
- && targetTsFileResource.getTsFile().length()
- < TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES)
{
- // the file size is smaller than magic string and version number
- throw new TsFileNotCompleteException(
- String.format(
- "target file %s is smaller than magic string and version
number size",
- targetTsFileResource));
- }
+ LOGGER.info(
+ "{}-{} [Compaction] Compacted target files, try to get the write
lock of source files",
+ storageGroupName,
+ dataRegionId);
+ // release the read lock of all source files, and get the write lock
of them to delete them
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ selectedTsFileResourceList.get(i).writeLock();
+ isHoldingWriteLock[i] = true;
+ }
- LOGGER.info(
- "{}-{} [Compaction] compaction finish, start to delete old files",
- storageGroupName,
- dataRegionId);
-
CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList,
sequence);
- CompactionUtils.deleteModificationForSourceFile(
- selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
-
- // inner space compaction task has only one target file
- if (!targetTsFileResource.isDeleted()) {
- FileMetrics.getInstance()
- .addTsFile(
- targetTsFileResource.getDatabaseName(),
- targetTsFileResource.getDataRegionId(),
- targetTsFileResource.getTsFile().length(),
- sequence,
- targetTsFileResource.getTsFile().getName());
-
- // set target resource to CLOSED, so that it can be selected to compact
- targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
- } else {
- // target resource is empty after compaction, then delete it
- targetTsFileResource.remove();
+ if (targetTsFileResource.getTsFile().exists()
+ && targetTsFileResource.getTsFile().length()
+ < TSFileConfig.MAGIC_STRING.getBytes().length * 2L +
Byte.BYTES) {
+ // the file size is smaller than magic string and version number
+ throw new TsFileNotCompleteException(
+ String.format(
+ "target file %s is smaller than magic string and version
number size",
+ targetTsFileResource));
+ }
+
+ LOGGER.info(
+ "{}-{} [Compaction] compaction finish, start to delete old files",
+ storageGroupName,
+ dataRegionId);
+ CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(
+ selectedTsFileResourceList, sequence);
+ CompactionUtils.deleteModificationForSourceFile(
+ selectedTsFileResourceList, storageGroupName + "-" + dataRegionId);
+
+ // inner space compaction task has only one target file
+ if (!targetTsFileResource.isDeleted()) {
+ FileMetrics.getInstance()
+ .addTsFile(
+ targetTsFileResource.getDatabaseName(),
+ targetTsFileResource.getDataRegionId(),
+ targetTsFileResource.getTsFile().length(),
+ sequence,
+ targetTsFileResource.getTsFile().getName());
+
+ // set target resource to CLOSED, so that it can be selected to
compact
+ targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ } else {
+ // target resource is empty after compaction, then delete it
+ targetTsFileResource.remove();
+ }
+ CompactionMetrics.getInstance().recordSummaryInfo(summary);
+
+ double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
+ LOGGER.info(
+ "{}-{} [Compaction] {} InnerSpaceCompaction task finishes
successfully, "
+ + "target file is {},"
+ + "time cost is {} s, "
+ + "compaction speed is {} MB/s, {}",
+ storageGroupName,
+ dataRegionId,
+ sequence ? "Sequence" : "Unsequence",
+ targetTsFileResource.getTsFile().getName(),
+ String.format("%.2f", costTime),
+ String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d /
costTime),
+ summary);
+
+ // 在何时关闭
}
- CompactionMetrics.getInstance().recordSummaryInfo(summary);
-
- double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
- LOGGER.info(
- "{}-{} [Compaction] {} InnerSpaceCompaction task finishes
successfully, "
- + "target file is {},"
- + "time cost is {} s, "
- + "compaction speed is {} MB/s, {}",
- storageGroupName,
- dataRegionId,
- sequence ? "Sequence" : "Unsequence",
- targetTsFileResource.getTsFile().getName(),
- String.format("%.2f", costTime),
- String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d /
costTime),
- summary);
-
- // 在何时关闭
- compactionLogger.close();
- FileUtils.delete(logFile);
+ Files.deleteIfExists(logFile.toPath());
} catch (Exception e) {
isSuccess = false;
printLogWhenException(LOGGER, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
index 031a7f95a7a..0baa7117392 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
@@ -25,6 +25,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Stream;
import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
@@ -36,6 +37,7 @@ public class CompactionLogAnalyzer {
private final List<TsFileIdentifier> sourceFileInfos = new ArrayList<>();
private final List<TsFileIdentifier> targetFileInfos = new ArrayList<>();
private final List<TsFileIdentifier> deletedTargetFileInfos = new
ArrayList<>();
+ private CompactionTaskStage taskStage;
private boolean isLogFromOld = false;
public CompactionLogAnalyzer(File logFile) {
@@ -48,9 +50,10 @@ public class CompactionLogAnalyzer {
* @throws IOException if io errors occurred
*/
public void analyze() throws IOException {
- String currLine;
try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(logFile))) {
+ String currLine;
while ((currLine = bufferedReader.readLine()) != null) {
+ final String lineValue = currLine;
String fileInfo;
if (currLine.startsWith(STR_SOURCE_FILES)) {
fileInfo = currLine.replaceFirst(STR_SOURCE_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
@@ -58,10 +61,16 @@ public class CompactionLogAnalyzer {
} else if (currLine.startsWith(STR_TARGET_FILES)) {
fileInfo = currLine.replaceFirst(STR_TARGET_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
- } else {
+ } else if (currLine.startsWith(STR_DELETED_TARGET_FILES)) {
fileInfo =
currLine.replaceFirst(STR_DELETED_TARGET_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
deletedTargetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
+ } else if (Stream.of(CompactionTaskStage.values())
+ .anyMatch(stage -> lineValue.startsWith(stage.name()))) {
+ taskStage = CompactionTaskStage.valueOf(currLine);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("unknown compaction log line: %s", currLine));
}
}
}
@@ -79,7 +88,7 @@ public class CompactionLogAnalyzer {
return deletedTargetFileInfos;
}
- public boolean isLogFromOld() {
- return isLogFromOld;
+ public CompactionTaskStage getTaskStage() {
+ return taskStage;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index 248726e34e1..2ded4a27b73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -60,6 +60,12 @@ public class CompactionLogger implements AutoCloseable {
}
}
+ public void logTaskStage(CompactionTaskStage stage) throws IOException {
+ logStream.write(stage.name().getBytes());
+ logStream.write(System.lineSeparator().getBytes());
+ logStream.flush();
+ }
+
public void logFile(TsFileResource tsFile, String flag) throws IOException {
String log =
flag
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
similarity index 63%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
index 15473f68e41..9a549a1790b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/ICompactionLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
@@ -19,8 +19,21 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
-import java.util.List;
+public enum CompactionTaskStage {
+ NOT_STARTED("task does not started"),
+ STARTED("task has started"),
+ TARGET_FILE_GENERATED("target file has been generated completely"),
+ TARGET_FILE_REPLACED("target file has been replaced in TsFileManager"),
+ SOURCE_FILE_CLEANED("source files are cleaned up"),
+ FINISHED("task finished");
-public interface ICompactionLogger {
- List<String> getSourceFileList();
+ private final String descirption;
+
+ CompactionTaskStage(String descirption) {
+ this.descirption = descirption;
+ }
+
+ public String getDescription() {
+ return this.descirption;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
similarity index 70%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
index e96b5104a10..d0769f5ba5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/InnerCompactionLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
@@ -23,45 +23,28 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
/** MergeLogger records the progress of a merge in file "merge.log" as text
lines. */
-public class InnerCompactionLogger extends CompactionLogger {
+public class SimpleCompactionLogger extends CompactionLogger {
- private List<TsFileResource> sourceFiles;
- private TsFileResource targetFile;
- private TsFileResource emptyTargetFile;
-
- public InnerCompactionLogger(File logFile) throws IOException {
+ public SimpleCompactionLogger(File logFile) throws IOException {
super(logFile);
- this.sourceFiles = new ArrayList<>();
- }
-
- public List<TsFileResource> getSourceFiles() {
- return sourceFiles;
- }
-
- public TsFileResource getTargetFile() {
- return targetFile;
- }
-
- public TsFileResource getEmptyTargetFile() {
- return emptyTargetFile;
}
public void logSourceFiles(List<TsFileResource> sourceFiles) throws
IOException {
- this.sourceFiles.addAll(sourceFiles);
logFiles(sourceFiles, STR_SOURCE_FILES);
}
public void logTargetFile(TsFileResource targetFile) throws IOException {
- this.targetFile = targetFile;
logFile(targetFile, STR_TARGET_FILES);
}
+ public void logTargetFiles(List<TsFileResource> targetFiles) throws
IOException {
+ logFiles(targetFiles, STR_TARGET_FILES);
+ }
+
public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws
IOException {
- this.emptyTargetFile = emptyTargetFile;
logFile(emptyTargetFile, STR_DELETED_TARGET_FILES);
}
}