This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 212a72d0a01 Refactor Compaction recover and exception handle logic
(#11349)
212a72d0a01 is described below
commit 212a72d0a01d186a0eebf3e66ebd1fe73a1c6794
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Oct 20 14:08:28 2023 +0800
Refactor Compaction recover and exception handle logic (#11349)
---
.../exception/CompactionRecoverException.java | 30 ++
.../execute/recover/CompactionRecoverManager.java | 20 -
.../execute/recover/CompactionRecoverTask.java | 238 +--------
.../execute/task/AbstractCompactionTask.java | 138 ++++-
.../execute/task/CrossSpaceCompactionTask.java | 164 ++++--
.../execute/task/InnerSpaceCompactionTask.java | 189 ++++---
.../execute/utils/log/CompactionLogAnalyzer.java | 121 +----
.../execute/utils/log/CompactionLogger.java | 18 +-
.../execute/utils/log/CompactionTaskStage.java | 39 ++
.../execute/utils/log/SimpleCompactionLogger.java | 50 ++
.../execute/utils/log/TsFileIdentifier.java | 30 --
.../compaction/AbstractCompactionTest.java | 3 +
.../CrossSpaceCompactionWithFastPerformerTest.java | 26 +-
...sSpaceCompactionWithReadPointPerformerTest.java | 26 +-
...eCrossSpaceCompactionRecoverCompatibleTest.java | 413 ---------------
.../SizeTieredCompactionRecoverCompatibleTest.java | 276 ----------
.../recover/SizeTieredCompactionRecoverTest.java | 588 ++++++++++++++-------
17 files changed, 966 insertions(+), 1403 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
new file mode 100644
index 00000000000..3c63e3d602d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionRecoverException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+public class CompactionRecoverException extends RuntimeException {
+ public CompactionRecoverException(String msg) {
+ super(msg);
+ }
+
+ public CompactionRecoverException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
index 93576c2fecc..23e078a1f02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,13 +60,11 @@ public class CompactionRecoverManager {
public void recoverInnerSpaceCompaction(boolean isSequence) {
logger.info("recovering inner compaction");
- recoverCompactionBefore013(true);
recoverCompaction(true, isSequence);
}
public void recoverCrossSpaceCompaction() {
logger.info("recovering cross compaction");
- recoverCompactionBefore013(false);
recoverCompaction(false, true);
}
@@ -138,21 +135,4 @@ public class CompactionRecoverManager {
.doCompaction();
}
}
-
- /** Check whether there is old compaction log from previous version (<0.13)
and recover it. */
- private void recoverCompactionBefore013(boolean isInnerSpace) {
- String oldLogName =
- isInnerSpace
- ? logicalStorageGroupName +
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD
- : CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD;
- File logFileFromOld =
-
FSFactoryProducer.getFSFactory().getFile(tsFileManager.getStorageGroupDir(),
oldLogName);
-
- if (logFileFromOld.exists()) {
- logger.info("Calling compaction task to recover from previous version.");
- new CompactionRecoverTask(
- logicalStorageGroupName, dataRegionId, tsFileManager,
logFileFromOld, isInnerSpace)
- .doCompaction();
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
index ac53ad571d3..e04fd69b3c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/recover/CompactionRecoverTask.java
@@ -25,19 +25,12 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
-import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -82,16 +75,7 @@ public class CompactionRecoverTask {
fullStorageGroupName,
compactionLogFile);
CompactionLogAnalyzer logAnalyzer = new
CompactionLogAnalyzer(compactionLogFile);
- CompactionRecoverFromOld compactionRecoverFromOld = new
CompactionRecoverFromOld();
- if (isInnerSpace &&
compactionRecoverFromOld.isInnerCompactionLogBefore013()) {
- // inner compaction log from previous version (<0.13)
- logAnalyzer.analyzeOldInnerCompactionLog();
- } else if (!isInnerSpace &&
compactionRecoverFromOld.isCrossCompactionLogBefore013()) {
- // cross compaction log from previous version (<0.13)
- logAnalyzer.analyzeOldCrossCompactionLog();
- } else {
- logAnalyzer.analyze();
- }
+ logAnalyzer.analyze();
List<TsFileIdentifier> sourceFileIdentifiers =
logAnalyzer.getSourceFileInfos();
List<TsFileIdentifier> targetFileIdentifiers =
logAnalyzer.getTargetFileInfos();
List<TsFileIdentifier> deletedTargetFileIdentifiers =
@@ -115,24 +99,13 @@ public class CompactionRecoverTask {
}
if (isAllSourcesFileExisted) {
- if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
- recoverSuccess =
-
compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistBefore013(
- targetFileIdentifiers);
- } else {
- recoverSuccess =
- handleWithAllSourceFilesExist(targetFileIdentifiers,
sourceFileIdentifiers);
- }
+ recoverSuccess =
+ handleWithAllSourceFilesExist(targetFileIdentifiers,
sourceFileIdentifiers);
+
} else {
- if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
- recoverSuccess =
-
compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostBefore013(
- targetFileIdentifiers, sourceFileIdentifiers);
- } else {
- recoverSuccess =
- handleWithSomeSourceFilesLost(
- targetFileIdentifiers, deletedTargetFileIdentifiers,
sourceFileIdentifiers);
- }
+ recoverSuccess =
+ handleWithSomeSourceFilesLost(
+ targetFileIdentifiers, deletedTargetFileIdentifiers,
sourceFileIdentifiers);
}
}
} catch (IOException e) {
@@ -358,201 +331,4 @@ public class CompactionRecoverTask {
}
return true;
}
-
- /**
- * Used to check whether it is recoverd from last version (<0.13) and
perform corresponding
- * process.
- */
- private class CompactionRecoverFromOld {
-
- /** Return whether cross compaction log file is from previous version
(<0.13). */
- private boolean isCrossCompactionLogBefore013() {
- return compactionLogFile
- .getName()
- .equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- }
-
- /** Return whether inner compaction log file is from previous version
(<0.13). */
- private boolean isInnerCompactionLogBefore013() {
- return
compactionLogFile.getName().startsWith(tsFileManager.getStorageGroupName());
- }
-
- /** Delete tmp target file and compaction mods file. */
- private boolean handleCrossCompactionWithAllSourceFilesExistBefore013(
- List<TsFileIdentifier> targetFileIdentifiers) {
- // delete tmp target file
- for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
- // xxx.tsfile.merge
- File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
- if (tmpTargetFile != null) {
- try {
- Files.delete(tmpTargetFile.toPath());
- } catch (IOException e) {
- logger.error(
- "{} [Compaction][Recover] failed to remove file {}, exception:
{}",
- fullStorageGroupName,
- tmpTargetFile,
- e);
- }
- File chunkMetadataTempFile =
- new File(
- tmpTargetFile.getAbsolutePath() +
TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX);
- if (chunkMetadataTempFile.exists()) {
- try {
- Files.delete(chunkMetadataTempFile.toPath());
- } catch (IOException e) {
- logger.error(
- "{} [Compaction][Recover] failed to remove file {},
exception: {}",
- fullStorageGroupName,
- chunkMetadataTempFile,
- e);
- }
- }
- }
- }
-
- // delete compaction mods file
- File compactionModsFileFromOld =
- new File(
- tsFileManager.getStorageGroupDir()
- + File.separator
- + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- return checkAndDeleteFile(compactionModsFileFromOld);
- }
-
- /**
- * 1. If target file does not exist, then move .merge file to target file
<br>
- * 2. If target resource file does not exist, then serialize it. <br>
- * 3. Append merging modification to target mods file and delete merging
mods file. <br>
- * 4. Delete source files and .merge file. <br>
- */
- @SuppressWarnings("squid:S3776")
- private boolean handleCrossCompactionWithSomeSourceFilesLostBefore013(
- List<TsFileIdentifier> targetFileIdentifiers,
- List<TsFileIdentifier> sourceFileIdentifiers) {
- try {
- File compactionModsFileFromOld =
- new File(
- tsFileManager.getStorageGroupDir()
- + File.separator
- +
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- List<TsFileResource> targetFileResources = new ArrayList<>();
- for (int i = 0; i < sourceFileIdentifiers.size(); i++) {
- TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i);
- if (sourceFileIdentifier.isSequence()) {
- File tmpTargetFile =
targetFileIdentifiers.get(i).getFileFromDataDirs();
- File targetFile = null;
-
- // move tmp target file to target file if not exist
- if (tmpTargetFile != null) {
- // move tmp target file to target file
- String sourceFilePath =
- tmpTargetFile
- .getPath()
- .replace(
- TsFileConstant.TSFILE_SUFFIX
- +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
- TsFileConstant.TSFILE_SUFFIX);
- targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new
File(sourceFilePath));
- FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile,
targetFile);
- } else {
- // target file must exist
- File file =
- TsFileNameGenerator.increaseCrossCompactionCnt(
- new File(
- targetFileIdentifiers
- .get(i)
- .getFilePath()
- .replace(
- TsFileConstant.TSFILE_SUFFIX
- +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
- TsFileConstant.TSFILE_SUFFIX)));
-
- targetFile = getFileFromDataDirs(file.getPath());
- }
- if (targetFile == null) {
- logger.error(
- "{} [Compaction][Recover] target file of source seq file {} "
- + "does not exist (<0.13).",
- fullStorageGroupName,
- sourceFileIdentifier.getFilePath());
- return false;
- }
-
- // serialize target resource file if not exist
- TsFileResource targetResource = new TsFileResource(targetFile);
- if (!targetResource.resourceFileExists()) {
- try (TsFileSequenceReader reader =
- new TsFileSequenceReader(targetFile.getAbsolutePath())) {
- FileLoaderUtils.updateTsFileResource(reader, targetResource);
- }
- targetResource.serialize();
- }
-
- targetFileResources.add(targetResource);
-
- // append compaction modifications to target mods file and delete
compaction mods file
- if (compactionModsFileFromOld.exists()) {
- ModificationFile compactionModsFile =
- new ModificationFile(compactionModsFileFromOld.getPath());
- appendCompactionModificationsBefore013(targetResource,
compactionModsFile);
- }
-
- // delete tmp target file
- if (!checkAndDeleteFile(tmpTargetFile)) {
- return false;
- }
- }
-
- // delete source tsfile
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (!checkAndDeleteFile(sourceFile)) {
- return false;
- }
-
- // delete source resource file
- sourceFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() +
TsFileResource.RESOURCE_SUFFIX);
- if (!checkAndDeleteFile(sourceFile)) {
- return false;
- }
-
- // delete source mods file
- sourceFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() +
ModificationFile.FILE_SUFFIX);
- if (!checkAndDeleteFile(sourceFile)) {
- return false;
- }
- }
-
- // delete compaction mods file
- if (!checkAndDeleteFile(compactionModsFileFromOld)) {
- return false;
- }
- } catch (IOException e) {
- logger.error(
- "{} [Compaction][Recover] fail to handle with some source files
lost from old version.",
- fullStorageGroupName,
- e);
- return false;
- }
-
- return true;
- }
-
- private void appendCompactionModificationsBefore013(
- TsFileResource resource, ModificationFile compactionModsFile) throws
IOException {
- if (compactionModsFile != null) {
- for (Modification modification :
compactionModsFile.getModifications()) {
- // we have to set modification offset to MAX_VALUE, as the offset of
source chunk may
- // change after compaction
- modification.setFileOffset(Long.MAX_VALUE);
- resource.getModFile().write(modification);
- }
- resource.getModFile().close();
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 02412723c48..b1bae1c54f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -19,15 +19,27 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionTaskStage;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -39,6 +51,9 @@ import java.util.List;
* finished. The future returns the {@link CompactionTaskSummary} of this task
execution.
*/
public abstract class AbstractCompactionTask {
+ protected static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
protected String dataRegionId;
protected String storageGroupName;
protected long timePartition;
@@ -49,9 +64,10 @@ public abstract class AbstractCompactionTask {
protected long serialId;
protected boolean crossTask;
protected boolean innerSeqTask;
-
+ protected CompactionTaskStage taskStage;
protected long memoryCost = 0L;
+ protected boolean recoverMemoryStatus;
protected CompactionTaskType compactionTaskType;
protected AbstractCompactionTask(
@@ -108,6 +124,22 @@ public abstract class AbstractCompactionTask {
protected abstract boolean doCompaction();
+ protected abstract void recover();
+
+ protected void printLogWhenException(Logger logger, Exception e) {
+ if (e instanceof InterruptedException) {
+ logger.warn("{}-{} [Compaction] Compaction interrupted",
storageGroupName, dataRegionId);
+ Thread.currentThread().interrupt();
+ } else {
+ logger.error(
+ "{}-{} [Compaction] Meet errors {}.",
+ compactionTaskType,
+ storageGroupName,
+ dataRegionId,
+ e);
+ }
+ }
+
public boolean start() {
boolean isSuccess = false;
summary.start();
@@ -182,6 +214,110 @@ public abstract class AbstractCompactionTask {
}
}
+ protected void replaceTsFileInMemory(
+ List<TsFileResource> removedTsFiles, List<TsFileResource> addedTsFiles)
throws IOException {
+ tsFileManager.writeLock("compactionRollBack");
+ try {
+ removeTsFileInMemory(removedTsFiles);
+ insertFilesToTsFileManager(addedTsFiles);
+ } finally {
+ tsFileManager.writeUnlock();
+ }
+ }
+
+ protected boolean checkAllSourceFileExists(List<TsFileResource>
tsFileResources) {
+ for (TsFileResource tsFileResource : tsFileResources) {
+ if (!tsFileResource.getTsFile().exists() ||
!tsFileResource.resourceFileExists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected void handleRecoverException(Exception e) {
+ LOGGER.error(
+ "{} [Compaction][Recover] Failed to recover compaction. TaskInfo: {},
Exception: {}",
+ dataRegionId,
+ this,
+ e);
+ tsFileManager.setAllowCompaction(false);
+ LOGGER.error("stop compaction because of exception during recovering");
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
+ }
+
+ protected void insertFilesToTsFileManager(List<TsFileResource> tsFiles)
throws IOException {
+ for (TsFileResource tsFileResource : tsFiles) {
+ if (!tsFileResource.isFileInList()) {
+ tsFileManager.keepOrderInsert(tsFileResource, tsFileResource.isSeq());
+ }
+ }
+ }
+
+ protected void removeTsFileInMemory(List<TsFileResource> resourceList) {
+ for (TsFileResource targetTsFile : resourceList) {
+ if (targetTsFile == null) {
+ // target file has been deleted due to empty after compaction
+ continue;
+ }
+ tsFileManager.remove(targetTsFile, targetTsFile.isSeq());
+ }
+ }
+
+ public File getRealTargetFile(TsFileIdentifier targetFileIdentifier, String
suffix) {
+ File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+ File targetFile =
+ getFileFromDataDirs(
+ targetFileIdentifier.getFilePath().replace(suffix,
TsFileConstant.TSFILE_SUFFIX));
+ return tmpTargetFile != null ? tmpTargetFile : targetFile;
+ }
+
+ /**
+ * This method find the File object of given filePath by searching it in
every data directory. If
+ * the file is not found, it will return null.
+ */
+ public File getFileFromDataDirs(String filePath) {
+ String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
+ for (String dataDir : dataDirs) {
+ File f = new File(dataDir, filePath);
+ if (f.exists()) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ protected void deleteCompactionModsFile(List<TsFileResource>
tsFileResourceList)
+ throws IOException {
+ for (TsFileResource tsFile : tsFileResourceList) {
+ ModificationFile modificationFile = tsFile.getCompactionModFile();
+ if (modificationFile.exists()) {
+ modificationFile.remove();
+ }
+ }
+ }
+
+ protected boolean deleteTsFilesOnDisk(List<TsFileResource> tsFiles) {
+ for (TsFileResource resource : tsFiles) {
+ if (!deleteTsFileOnDisk(resource)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean deleteTsFileOnDisk(TsFileResource tsFileResource) {
+ tsFileResource.writeLock();
+ try {
+ return tsFileResource.remove();
+ } finally {
+ tsFileResource.writeUnlock();
+ }
+ }
+
+ public void setTaskStage(CompactionTaskStage stage) {
+ this.taskStage = stage;
+ }
+
public boolean isTaskRan() {
return summary.isRan();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 4f9e23614ce..b083778a81d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -22,41 +22,48 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class CrossSpaceCompactionTask extends AbstractCompactionTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected List<TsFileResource> selectedSequenceFiles;
protected List<TsFileResource> selectedUnsequenceFiles;
- protected TsFileResourceList seqTsFileResourceList;
- protected TsFileResourceList unseqTsFileResourceList;
private File logFile;
protected List<TsFileResource> targetTsfileResourceList;
+ private List<TsFileResource> emptyTargetTsFileResourceList;
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected double selectedSeqFileSize = 0;
protected double selectedUnseqFileSize = 0;
+ protected boolean needRecoverTaskInfoFromLogFile;
+
@SuppressWarnings("squid:S107")
public CrossSpaceCompactionTask(
long timePartition,
@@ -74,10 +81,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
serialId);
this.selectedSequenceFiles = selectedSequenceFiles;
this.selectedUnsequenceFiles = selectedUnsequenceFiles;
- this.seqTsFileResourceList =
- tsFileManager.getOrCreateSequenceListByTimePartition(timePartition);
- this.unseqTsFileResourceList =
- tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
+ this.emptyTargetTsFileResourceList = new ArrayList<>();
this.performer = performer;
this.hashCode = this.toString().hashCode();
this.memoryCost = memoryCost;
@@ -86,9 +90,44 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
createSummary();
}
+ public CrossSpaceCompactionTask(
+ String databaseName, String dataRegionId, TsFileManager tsFileManager,
File logFile) {
+ super(databaseName, dataRegionId, 0L, tsFileManager, 0L,
CompactionTaskType.NORMAL);
+ this.logFile = logFile;
+ this.needRecoverTaskInfoFromLogFile = true;
+ }
+
+ private void recoverTaskInfoFromLogFile() throws IOException {
+ CompactionLogAnalyzer logAnalyzer = new
CompactionLogAnalyzer(this.logFile);
+ logAnalyzer.analyze();
+ List<TsFileIdentifier> sourceFileIdentifiers =
logAnalyzer.getSourceFileInfos();
+ this.selectedSequenceFiles = new ArrayList<>();
+ sourceFileIdentifiers.stream()
+ .filter(TsFileIdentifier::isSequence)
+ .forEach(f -> this.selectedSequenceFiles.add(new
TsFileResource(f.getFileFromDataDirs())));
+ sourceFileIdentifiers.stream()
+ .filter(f -> !f.isSequence())
+ .forEach(
+ f -> this.selectedUnsequenceFiles.add(new
TsFileResource(f.getFileFromDataDirs())));
+
+ List<TsFileIdentifier> targetFileIdentifiers =
logAnalyzer.getTargetFileInfos();
+ List<TsFileIdentifier> deletedTargetFileIdentifiers =
logAnalyzer.getDeletedTargetFileInfos();
+ for (TsFileIdentifier f : targetFileIdentifiers) {
+ File targetFileOnDisk = getRealTargetFile(f,
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX);
+ // The targetFileOnDisk may be null, but it won't impact the task
recover stage
+ TsFileResource targetTsFile = new TsFileResource(targetFileOnDisk);
+ this.targetTsfileResourceList.add(targetTsFile);
+ if (deletedTargetFileIdentifiers.contains(f)) {
+ this.emptyTargetTsFileResourceList.add(targetTsFile);
+ }
+ }
+ this.taskStage = logAnalyzer.getTaskStage();
+ }
+
@Override
@SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
public boolean doCompaction() {
+ recoverMemoryStatus = true;
boolean isSuccess = true;
try {
if (!tsFileManager.isAllowCompaction()) {
@@ -140,11 +179,11 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
+ targetTsfileResourceList.get(0).getTsFile().getName()
+ CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
- try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) {
+ try (SimpleCompactionLogger compactionLogger = new
SimpleCompactionLogger(logFile)) {
// print the path of the temporary file first for priority check
during recovery
- compactionLogger.logFiles(selectedSequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(selectedUnsequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(targetTsfileResourceList,
CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logSourceFiles(selectedSequenceFiles);
+ compactionLogger.logSourceFiles(selectedUnsequenceFiles);
+ compactionLogger.logTargetFiles(targetTsfileResourceList);
compactionLogger.force();
performer.setSourceFiles(selectedSequenceFiles,
selectedUnsequenceFiles);
@@ -170,7 +209,8 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
// find empty target files and add log
for (TsFileResource targetResource : targetTsfileResourceList) {
if (targetResource.isDeleted()) {
- compactionLogger.logFile(targetResource,
CompactionLogger.STR_DELETED_TARGET_FILES);
+ emptyTargetTsFileResourceList.add(targetResource);
+ compactionLogger.logEmptyTargetFile(targetResource);
compactionLogger.force();
}
}
@@ -245,39 +285,83 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
(selectedSeqFileSize + selectedUnseqFileSize) / 1024.0d /
1024.0d / costTime),
summary);
}
- if (logFile.exists()) {
- FileUtils.delete(logFile);
- }
+ Files.deleteIfExists(logFile.toPath());
} catch (Exception e) {
isSuccess = false;
- // catch throwable to handle OOM errors
- if (!(e instanceof InterruptedException)) {
- LOGGER.error(
- "{}-{} [Compaction] Meet errors in cross space compaction.",
- storageGroupName,
- dataRegionId,
- e);
+ printLogWhenException(LOGGER, e);
+ recover();
+ } finally {
+ releaseAllLocks();
+ }
+ return isSuccess;
+ }
+
+ public void recover() {
+ try {
+ if (needRecoverTaskInfoFromLogFile) {
+ recoverTaskInfoFromLogFile();
+ }
+ if (shouldRollback()) {
+ rollback();
} else {
- LOGGER.warn("{}-{} [Compaction] Compaction interrupted",
storageGroupName, dataRegionId);
- // clean the interrupted flag
- Thread.interrupted();
+ // That finishTask() is revoked means
+ finishTask();
}
+ } catch (Exception e) {
+ handleRecoverException(e);
+ }
+ }
- // handle exception
- CompactionExceptionHandler.handleException(
- storageGroupName + "-" + dataRegionId,
- logFile,
+ private boolean shouldRollback() {
+ return checkAllSourceFileExists(selectedSequenceFiles)
+ && checkAllSourceFileExists(selectedUnsequenceFiles);
+ }
+
+ private void rollback() throws IOException {
+ // if the task has started,
+ if (recoverMemoryStatus) {
+ replaceTsFileInMemory(
targetTsfileResourceList,
- selectedSequenceFiles,
- selectedUnsequenceFiles,
- tsFileManager,
- timePartition,
- false,
- true);
- } finally {
- releaseAllLocks();
+ Stream.concat(selectedSequenceFiles.stream(),
selectedUnsequenceFiles.stream())
+ .collect(Collectors.toList()));
}
- return isSuccess;
+ deleteCompactionModsFile(selectedSequenceFiles);
+ deleteCompactionModsFile(selectedUnsequenceFiles);
+ // delete target file
+ if (targetTsfileResourceList != null &&
!deleteTsFilesOnDisk(targetTsfileResourceList)) {
+ throw new CompactionRecoverException("failed to delete target file %s");
+ }
+ }
+
+ private void finishTask() throws IOException {
+ for (TsFileResource target : targetTsfileResourceList) {
+ if (target.isDeleted() ||
emptyTargetTsFileResourceList.contains(target)) {
+ // it means the target file is empty after compaction
+ if (target.remove()) {
+ throw new CompactionRecoverException(
+ String.format("failed to delete empty target file %s", target));
+ }
+ } else {
+ File targetFile = target.getTsFile();
+ if (targetFile == null ||
!TsFileUtils.isTsFileComplete(target.getTsFile())) {
+ throw new CompactionRecoverException(
+ String.format("Target file is not completed. %s", targetFile));
+ }
+ if (recoverMemoryStatus) {
+ target.setStatus(TsFileResourceStatus.NORMAL);
+ }
+ }
+ }
+ if (!deleteTsFilesOnDisk(selectedSequenceFiles)
+ || !deleteTsFilesOnDisk(selectedUnsequenceFiles)) {
+ throw new CompactionRecoverException("source files cannot be deleted
successfully");
+ }
+ if (recoverMemoryStatus) {
+ FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles);
+ FileMetrics.getInstance().deleteTsFile(true, selectedUnsequenceFiles);
+ }
+ deleteCompactionModsFile(selectedSequenceFiles);
+ deleteCompactionModsFile(selectedUnsequenceFiles);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index ccde5bf6623..e52fbddfa74 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -23,57 +23,53 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class InnerSpaceCompactionTask extends AbstractCompactionTask {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected List<TsFileResource> selectedTsFileResourceList;
protected TsFileResource targetTsFileResource;
+ protected boolean isTargetTsFileEmpty;
protected boolean sequence;
protected long selectedFileSize;
protected int sumOfCompactionCount;
protected long maxFileVersion;
protected int maxCompactionCount;
private File logFile;
-
- protected TsFileResourceList tsFileResourceList;
protected List<TsFileResource> targetTsFileList;
protected boolean[] isHoldingWriteLock;
-
protected long maxModsFileSize;
-
protected AbstractInnerSpaceEstimator innerSpaceEstimator;
+ protected boolean needRecoverTaskInfoFromLogFile;
public InnerSpaceCompactionTask(
long timePartition,
@@ -92,6 +88,34 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
CompactionTaskType.NORMAL);
}
+ public InnerSpaceCompactionTask(
+ String databaseName, String dataRegionId, TsFileManager tsFileManager,
File logFile) {
+ super(databaseName, dataRegionId, 0L, tsFileManager, 0L,
CompactionTaskType.NORMAL);
+ this.logFile = logFile;
+ this.needRecoverTaskInfoFromLogFile = true;
+ }
+
+ private void recoverTaskInfoFromLogFile() throws IOException {
+ CompactionLogAnalyzer logAnalyzer = new
CompactionLogAnalyzer(this.logFile);
+ logAnalyzer.analyze();
+ List<TsFileIdentifier> sourceFileIdentifiers =
logAnalyzer.getSourceFileInfos();
+ this.selectedTsFileResourceList = new ArrayList<>();
+ sourceFileIdentifiers.forEach(
+ f -> this.selectedTsFileResourceList.add(new
TsFileResource(f.getFileFromDataDirs())));
+
+ List<TsFileIdentifier> targetFileIdentifiers =
logAnalyzer.getTargetFileInfos();
+ List<TsFileIdentifier> deletedTargetFileIdentifiers =
logAnalyzer.getDeletedTargetFileInfos();
+ if (!targetFileIdentifiers.isEmpty()) {
+ File targetFileOnDisk =
+ getRealTargetFile(
+ targetFileIdentifiers.get(0),
IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX);
+ // The targetFileOnDisk may be null, but it won't impact the task
recover stage
+ this.targetTsFileResource = new TsFileResource(targetFileOnDisk);
+ }
+ this.isTargetTsFileEmpty = !deletedTargetFileIdentifiers.isEmpty();
+ this.taskStage = logAnalyzer.getTaskStage();
+ }
+
public InnerSpaceCompactionTask(
long timePartition,
TsFileManager tsFileManager,
@@ -121,11 +145,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
isHoldingWriteLock[i] = false;
}
- if (sequence) {
- tsFileResourceList =
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition);
- } else {
- tsFileResourceList =
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition);
- }
this.hashCode = this.toString().hashCode();
this.innerSeqTask = sequence;
this.crossTask = false;
@@ -133,12 +152,25 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
createSummary();
}
+ private void prepare() throws IOException {
+ targetTsFileResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(
+ selectedTsFileResourceList, sequence);
+ String dataDirectory =
selectedTsFileResourceList.get(0).getTsFile().getParent();
+ logFile =
+ new File(
+ dataDirectory
+ + File.separator
+ + targetTsFileResource.getTsFile().getName()
+ + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
+ }
+
@Override
@SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
protected boolean doCompaction() {
long startTime = System.currentTimeMillis();
// get resource of target file
- String dataDirectory =
selectedTsFileResourceList.get(0).getTsFile().getParent();
+ recoverMemoryStatus = true;
LOGGER.info(
"{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files,
"
+ "total file size is {} MB.",
@@ -150,20 +182,12 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
boolean isSuccess = true;
try {
- targetTsFileResource =
- TsFileNameGenerator.getInnerCompactionTargetFileResource(
- selectedTsFileResourceList, sequence);
- logFile =
- new File(
- dataDirectory
- + File.separator
- + targetTsFileResource.getTsFile().getName()
- + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
- try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) {
+ prepare();
+ try (SimpleCompactionLogger compactionLogger = new
SimpleCompactionLogger(logFile)) {
// Here is tmpTargetFile, which is xxx.target
targetTsFileList = new
ArrayList<>(Collections.singletonList(targetTsFileResource));
- compactionLogger.logFiles(selectedTsFileResourceList,
CompactionLogger.STR_SOURCE_FILES);
- compactionLogger.logFiles(targetTsFileList,
CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logSourceFiles(selectedTsFileResourceList);
+ compactionLogger.logTargetFile(targetTsFileResource);
compactionLogger.force();
LOGGER.info(
"{}-{} [Compaction] compaction with {}",
@@ -215,7 +239,8 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
}
if (targetTsFileResource.isDeleted()) {
- compactionLogger.logFile(targetTsFileResource,
CompactionLogger.STR_DELETED_TARGET_FILES);
+ compactionLogger.logEmptyTargetFile(targetTsFileResource);
+ isTargetTsFileEmpty = true;
compactionLogger.force();
}
@@ -233,7 +258,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
"{}-{} [Compaction] Compacted target files, try to get the write
lock of source files",
storageGroupName,
dataRegionId);
-
// release the read lock of all source files, and get the write lock
of them to delete them
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
selectedTsFileResourceList.get(i).writeLock();
@@ -291,52 +315,75 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d /
costTime),
summary);
}
- if (logFile.exists()) {
- FileUtils.delete(logFile);
- }
+ Files.deleteIfExists(logFile.toPath());
} catch (Exception e) {
isSuccess = false;
- // catch throwable to handle OOM errors
- if (!(e instanceof InterruptedException)) {
- LOGGER.error(
- "{}-{} [Compaction] Meet errors in inner space compaction.",
- storageGroupName,
- dataRegionId,
- e);
+ printLogWhenException(LOGGER, e);
+ recover();
+ } finally {
+ releaseAllLocks();
+ }
+ return isSuccess;
+ }
+
+ public void recover() {
+ try {
+ if (needRecoverTaskInfoFromLogFile) {
+ recoverTaskInfoFromLogFile();
+ }
+ if (shouldRollback()) {
+ rollback();
} else {
- // clean the interrupt flag
- LOGGER.warn("{}-{} [Compaction] Compaction interrupted",
storageGroupName, dataRegionId);
- Thread.interrupted();
+ // That finishTask() is revoked means
+ finishTask();
}
+ } catch (Exception e) {
+ handleRecoverException(e);
+ }
+ }
- // handle exception
- if (isSequence()) {
- CompactionExceptionHandler.handleException(
- storageGroupName + "-" + dataRegionId,
- logFile,
- targetTsFileList,
- selectedTsFileResourceList,
- Collections.emptyList(),
- tsFileManager,
- timePartition,
- true,
- isSequence());
- } else {
- CompactionExceptionHandler.handleException(
- storageGroupName + "-" + dataRegionId,
- logFile,
- targetTsFileList,
- Collections.emptyList(),
- selectedTsFileResourceList,
- tsFileManager,
- timePartition,
- true,
- isSequence());
+ private void rollback() throws IOException {
+ // if the task has started,
+ if (recoverMemoryStatus) {
+ replaceTsFileInMemory(
+ Collections.singletonList(targetTsFileResource),
selectedTsFileResourceList);
+ }
+ deleteCompactionModsFile(selectedTsFileResourceList);
+ // delete target file
+ if (targetTsFileResource != null &&
!deleteTsFileOnDisk(targetTsFileResource)) {
+ throw new CompactionRecoverException(
+ String.format("failed to delete target file %s",
targetTsFileResource));
+ }
+ }
+
+ private void finishTask() throws IOException {
+ if (targetTsFileResource.isDeleted() || isTargetTsFileEmpty) {
+ // it means the target file is empty after compaction
+ if (targetTsFileResource.remove()) {
+ throw new CompactionRecoverException(
+ String.format("failed to delete empty target file %s",
targetTsFileResource));
+ }
+ } else {
+ File targetFile = targetTsFileResource.getTsFile();
+ if (targetFile == null ||
!TsFileUtils.isTsFileComplete(targetTsFileResource.getTsFile())) {
+ throw new CompactionRecoverException(
+ String.format("Target file is not completed. %s", targetFile));
+ }
+ if (recoverMemoryStatus) {
+ targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
}
- } finally {
- releaseAllLocks();
}
- return isSuccess;
+ if (!deleteTsFilesOnDisk(selectedTsFileResourceList)) {
+ throw new CompactionRecoverException("source files cannot be deleted
successfully");
+ }
+ if (recoverMemoryStatus) {
+ FileMetrics.getInstance().deleteTsFile(true, selectedTsFileResourceList);
+ }
+ deleteCompactionModsFile(selectedTsFileResourceList);
+ }
+
+ private boolean shouldRollback() {
+ return checkAllSourceFileExists(selectedTsFileResourceList);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
index 61edea2d225..ca3404807ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogAnalyzer.java
@@ -19,26 +19,17 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Stream;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.SEQUENCE_NAME_FROM_OLD;
import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_MERGE_START_FROM_OLD;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SEQ_FILES_FROM_OLD;
import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES_FROM_OLD;
import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES_FROM_OLD;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.STR_UNSEQ_FILES_FROM_OLD;
-import static
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger.UNSEQUENCE_NAME_FROM_OLD;
public class CompactionLogAnalyzer {
@@ -46,21 +37,18 @@ public class CompactionLogAnalyzer {
private final List<TsFileIdentifier> sourceFileInfos = new ArrayList<>();
private final List<TsFileIdentifier> targetFileInfos = new ArrayList<>();
private final List<TsFileIdentifier> deletedTargetFileInfos = new
ArrayList<>();
- private boolean isLogFromOld = false;
+ private CompactionTaskStage taskStage;
public CompactionLogAnalyzer(File logFile) {
this.logFile = logFile;
}
- /**
- * analyze (source files, target files, deleted target files).
- *
- * @throws IOException if io errors occurred
- */
- public void analyze() throws IOException {
- String currLine;
+ /** analyze (source files, target files, deleted target files). */
+ public void analyze() throws IOException, IllegalArgumentException {
try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(logFile))) {
+ String currLine;
while ((currLine = bufferedReader.readLine()) != null) {
+ final String lineValue = currLine;
String fileInfo;
if (currLine.startsWith(STR_SOURCE_FILES)) {
fileInfo = currLine.replaceFirst(STR_SOURCE_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
@@ -68,100 +56,21 @@ public class CompactionLogAnalyzer {
} else if (currLine.startsWith(STR_TARGET_FILES)) {
fileInfo = currLine.replaceFirst(STR_TARGET_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
- } else {
+ } else if (currLine.startsWith(STR_DELETED_TARGET_FILES)) {
fileInfo =
currLine.replaceFirst(STR_DELETED_TARGET_FILES +
TsFileIdentifier.INFO_SEPARATOR, "");
deletedTargetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
+ } else if (Stream.of(CompactionTaskStage.values())
+ .anyMatch(stage -> lineValue.startsWith(stage.name()))) {
+ taskStage = CompactionTaskStage.valueOf(currLine);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("unknown compaction log line: %s", currLine));
}
}
}
}
- /**
- * Analyze inner space compaction log of previous version (<0.13).
- *
- * @throws IOException if io errors occurred
- */
- public void analyzeOldInnerCompactionLog() throws IOException {
- isLogFromOld = true;
- String currLine;
- try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(logFile))) {
- while ((currLine = bufferedReader.readLine()) != null) {
- switch (currLine) {
- case STR_SOURCE_FILES_FROM_OLD:
- currLine = bufferedReader.readLine();
-
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine));
- break;
- case STR_TARGET_FILES_FROM_OLD:
- currLine = bufferedReader.readLine();
-
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(currLine));
- break;
- case STR_SOURCE_FILES:
- currLine = bufferedReader.readLine();
-
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine));
- break;
- case STR_TARGET_FILES:
- currLine = bufferedReader.readLine();
-
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(currLine));
- break;
- case SEQUENCE_NAME_FROM_OLD:
- case UNSEQUENCE_NAME_FROM_OLD:
- break;
- default:
- break;
- }
- }
- }
- }
-
- /**
- * Analyze cross space compaction log of previous version (<0.13).
- *
- * @throws IOException if io errors occurred
- */
- @SuppressWarnings("squid:S135")
- public void analyzeOldCrossCompactionLog() throws IOException {
- isLogFromOld = true;
- String currLine;
- boolean isSeqSource = true;
- try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(logFile))) {
- while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(STR_UNSEQ_FILES_FROM_OLD)) {
- isSeqSource = false;
- continue;
- } else if (currLine.equals(STR_SEQ_FILES_FROM_OLD)) {
- isSeqSource = true;
- continue;
- } else if (currLine.equals(STR_MERGE_START_FROM_OLD)) {
- break;
- }
- analyzeOldFilePath(isSeqSource, currLine);
- }
- }
- }
-
- private void analyzeOldFilePath(boolean isSeqSource, String oldFilePath) {
- if (oldFilePath.startsWith("root")) {
-
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(oldFilePath));
- } else {
-
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(oldFilePath));
- }
-
- int pos = oldFilePath.lastIndexOf(TsFileConstant.TSFILE_SUFFIX);
- if (isSeqSource) {
- String targetFilePath =
- oldFilePath.substring(0, pos)
- + TsFileConstant.TSFILE_SUFFIX
- + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD
- + oldFilePath.substring(pos +
TsFileConstant.TSFILE_SUFFIX.length());
- if (oldFilePath.startsWith("root")) {
-
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(targetFilePath));
- } else {
-
targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromFilePath(targetFilePath));
- }
- }
- }
-
public List<TsFileIdentifier> getSourceFileInfos() {
return sourceFileInfos;
}
@@ -174,7 +83,7 @@ public class CompactionLogAnalyzer {
return deletedTargetFileInfos;
}
- public boolean isLogFromOld() {
- return isLogFromOld;
+ public CompactionTaskStage getTaskStage() {
+ return taskStage;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index 9d02418b8ea..bef589a8c75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -30,23 +30,11 @@ import java.util.List;
public class CompactionLogger implements AutoCloseable {
public static final String CROSS_COMPACTION_LOG_NAME_SUFFIX =
".cross-compaction.log";
- public static final String CROSS_COMPACTION_LOG_NAME_FROM_OLD = "merge.log";
public static final String INNER_COMPACTION_LOG_NAME_SUFFIX =
".inner-compaction.log";
- public static final String INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD =
".compaction.log";
public static final String STR_SOURCE_FILES = "source";
public static final String STR_TARGET_FILES = "target";
-
public static final String STR_DELETED_TARGET_FILES = "empty";
-
- public static final String STR_SOURCE_FILES_FROM_OLD = "info-source";
- public static final String STR_TARGET_FILES_FROM_OLD = "info-target";
- public static final String STR_SEQ_FILES_FROM_OLD = "seqFiles";
- public static final String STR_UNSEQ_FILES_FROM_OLD = "unseqFiles";
- public static final String SEQUENCE_NAME_FROM_OLD = "sequence";
- public static final String UNSEQUENCE_NAME_FROM_OLD = "unsequence";
- public static final String STR_MERGE_START_FROM_OLD = "merge start";
-
private FileOutputStream logStream;
public CompactionLogger(File logFile) throws IOException {
@@ -64,6 +52,12 @@ public class CompactionLogger implements AutoCloseable {
}
}
+ public void logTaskStage(CompactionTaskStage stage) throws IOException {
+ logStream.write(stage.name().getBytes());
+ logStream.write(System.lineSeparator().getBytes());
+ logStream.flush();
+ }
+
public void logFile(TsFileResource tsFile, String flag) throws IOException {
String log =
flag
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
new file mode 100644
index 00000000000..9a549a1790b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionTaskStage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
+
+public enum CompactionTaskStage {
+ NOT_STARTED("task does not started"),
+ STARTED("task has started"),
+ TARGET_FILE_GENERATED("target file has been generated completely"),
+ TARGET_FILE_REPLACED("target file has been replaced in TsFileManager"),
+ SOURCE_FILE_CLEANED("source files are cleaned up"),
+ FINISHED("task finished");
+
+ private final String descirption;
+
+ CompactionTaskStage(String descirption) {
+ this.descirption = descirption;
+ }
+
+ public String getDescription() {
+ return this.descirption;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
new file mode 100644
index 00000000000..d0769f5ba5b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/SimpleCompactionLogger.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/** MergeLogger records the progress of a merge in file "merge.log" as text
lines. */
+public class SimpleCompactionLogger extends CompactionLogger {
+
+ public SimpleCompactionLogger(File logFile) throws IOException {
+ super(logFile);
+ }
+
+ public void logSourceFiles(List<TsFileResource> sourceFiles) throws
IOException {
+ logFiles(sourceFiles, STR_SOURCE_FILES);
+ }
+
+ public void logTargetFile(TsFileResource targetFile) throws IOException {
+ logFile(targetFile, STR_TARGET_FILES);
+ }
+
+ public void logTargetFiles(List<TsFileResource> targetFiles) throws
IOException {
+ logFiles(targetFiles, STR_TARGET_FILES);
+ }
+
+ public void logEmptyTargetFile(TsFileResource emptyTargetFile) throws
IOException {
+ logFile(emptyTargetFile, STR_DELETED_TARGET_FILES);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
index e1ac67bd8ee..6b3b5fdddd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
@@ -51,12 +51,6 @@ public class TsFileIdentifier {
public static final int TIME_PARTITION_OFFSET_IN_LOG = 3;
public static final int FILE_NAME_OFFSET_IN_LOG = 4;
- private static final int LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD = 0;
- private static final int DATA_REGION_OFFSET_IN_LOG_FROM_OLD = 1;
- private static final int TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD = 2;
- private static final int FILE_NAME_OFFSET_IN_LOG_FROM_OLD = 3;
- private static final int SEQUENCE_OFFSET_IN_LOG_FROM_OLD = 4;
-
private static final String SEQUENCE_STR = "sequence";
private static final String UNSEQUENCE_STR = "unsequence";
@@ -123,30 +117,6 @@ public class TsFileIdentifier {
splittedFileInfo[FILE_NAME_OFFSET_IN_LOG]);
}
- /**
- * This function generates an instance of CompactionFileIdentifier by
parsing the old info string
- * from previous version (<0.13) of a tsfile(usually recorded in a
compaction.log). Such as
- * “root.test.sg 0 0 1-1-0-0.tsfile true" from old cross space compaction
log and "root.test.sg 0
- * 0 1-1-0-0.tsfile sequence" from old inner space compaction log.
- */
- public static TsFileIdentifier getFileIdentifierFromOldInfoString(String
oldInfoString) {
- String[] splittedFileInfo = oldInfoString.split(INFO_SEPARATOR);
- int length = splittedFileInfo.length;
- if (length != 5) {
- throw new RuntimeException(
- String.format(
- "String %s is not a legal file info string from previous version
(<0.13)",
- oldInfoString));
- }
- return new TsFileIdentifier(
- splittedFileInfo[LOGICAL_SG_OFFSET_IN_LOG_FROM_OLD],
- splittedFileInfo[DATA_REGION_OFFSET_IN_LOG_FROM_OLD],
- splittedFileInfo[TIME_PARTITION_OFFSET_IN_LOG_FROM_OLD],
- splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals("true")
- ||
splittedFileInfo[SEQUENCE_OFFSET_IN_LOG_FROM_OLD].equals(SEQUENCE_STR),
- splittedFileInfo[FILE_NAME_OFFSET_IN_LOG_FROM_OLD]);
- }
-
@Override
public String toString() {
return String.format(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 10ca08597bb..c539ba7647b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -177,6 +177,8 @@ public class AbstractCompactionTest {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ tsFileManager.getOrCreateUnsequenceListByTimePartition(0);
}
/**
@@ -414,6 +416,7 @@ public class AbstractCompactionTest {
if (UNSEQ_DIRS.exists()) {
FileUtils.deleteDirectory(UNSEQ_DIRS);
}
+ tsFileManager.clear();
}
private void removeFiles() throws IOException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerTest.java
index 90116dced7a..5a7fe53dcf3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerTest.java
@@ -419,10 +419,7 @@ public class CrossSpaceCompactionWithFastPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
@@ -722,10 +719,7 @@ public class CrossSpaceCompactionWithFastPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
@@ -1024,10 +1018,7 @@ public class CrossSpaceCompactionWithFastPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new FastCompactionPerformer(true),
@@ -1052,4 +1043,15 @@ public class CrossSpaceCompactionWithFastPerformerTest {
}
}
}
+
+ private TsFileManager getTsFileManager() {
+ TsFileManager tsFileManager =
+ new TsFileManager(
+ "root.compactionTest",
+ "0",
+ "target\\data\\sequence\\test\\root.compactionTest\\0\\0\\");
+ tsFileManager.getOrCreateUnsequenceListByTimePartition(0);
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ return tsFileManager;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerTest.java
index 59a273d3f4d..6206a50a545 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerTest.java
@@ -418,10 +418,7 @@ public class
CrossSpaceCompactionWithReadPointPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new ReadPointCompactionPerformer(),
@@ -721,10 +718,7 @@ public class
CrossSpaceCompactionWithReadPointPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new ReadPointCompactionPerformer(),
@@ -1023,10 +1017,7 @@ public class
CrossSpaceCompactionWithReadPointPerformerTest {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
- new TsFileManager(
- "root.compactionTest",
- "0",
-
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
+ getTsFileManager(),
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles(),
new ReadPointCompactionPerformer(),
@@ -1051,4 +1042,15 @@ public class
CrossSpaceCompactionWithReadPointPerformerTest {
}
}
}
+
+ private TsFileManager getTsFileManager() {
+ TsFileManager tsFileManager =
+ new TsFileManager(
+ "root.compactionTest",
+ "0",
+ "target\\data\\sequence\\test\\root.compactionTest\\0\\0\\");
+ tsFileManager.getOrCreateUnsequenceListByTimePartition(0);
+ tsFileManager.getOrCreateSequenceListByTimePartition(0);
+ return tsFileManager;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/RewriteCrossSpaceCompactionRecoverCompatibleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/RewriteCrossSpaceCompactionRecoverCompatibleTest.java
deleted file mode 100644
index ed90707cd3c..00000000000
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/RewriteCrossSpaceCompactionRecoverCompatibleTest.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.storageengine.dataregion.compaction.recover;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverTask;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
-import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import com.google.common.io.Files;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-public class RewriteCrossSpaceCompactionRecoverCompatibleTest extends
AbstractCompactionTest {
- private final String oldThreadName = Thread.currentThread().getName();
-
- @Override
- @Before
- public void setUp()
- throws IOException, MetadataException, WriteProcessException,
InterruptedException {
- super.setUp();
- Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
- }
-
- @Override
- @After
- public void tearDown() throws StorageEngineException, IOException {
- super.tearDown();
- Thread.currentThread().setName(oldThreadName);
- }
-
- @Test
- public void testCompatibleWithAllSourceFilesExistWithFileInfo() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- createFiles(3, 2, 3, 50, 0, 0, 50, 50, false, false);
- registerTimeseriesInMManger(2, 3, false);
- List<TsFileResource> tmpTargetResources = new ArrayList<>();
- for (TsFileResource resource : seqResources) {
- File mergeFile =
- new File(
- resource.getTsFilePath() +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD);
- mergeFile.createNewFile();
- tmpTargetResources.add(new TsFileResource(mergeFile));
- }
-
- File logFile =
- new File(SEQ_DIRS.getPath(),
CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- logWriter.write("seqFiles\n");
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(
- String.format(
- COMPACTION_TEST_SG + " 0 0 %s true\n",
tsFileResource.getTsFile().getName()));
- }
- logWriter.write("unseqFiles\n");
- for (TsFileResource tsFileResource : unseqResources) {
- logWriter.write(
- String.format(
- COMPACTION_TEST_SG + " 0 0 %s false\n",
tsFileResource.getTsFile().getName()));
- }
- logWriter.close();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s1",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
unseqResources.get(i), false);
- }
- ModificationFile mergeMods =
- new ModificationFile(
- SEQ_DIRS + File.separator +
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- mergeMods.write(new Deletion(new PartialPath("root.d1.s1"), 100, 0, 100));
- mergeMods.close();
-
- TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0",
SEQ_DIRS.getPath());
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, logFile,
false)
- .doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
- for (TsFileResource resource : unseqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
- for (TsFileResource resource : tmpTargetResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
- for (TsFileResource resource :
-
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources))
{
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
-
- Assert.assertFalse(logFile.exists());
- Assert.assertFalse(mergeMods.exists());
- }
-
- @Test
- public void testCompatibleWithSomeSourceFilesLostWithFileInfo() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- createFiles(3, 2, 3, 50, 0, 0, 50, 50, false, false);
- registerTimeseriesInMManger(2, 3, false);
- List<TsFileResource> tmpTargetResources = new ArrayList<>();
- for (TsFileResource resource : seqResources) {
- File mergeFile =
- new File(
- resource.getTsFilePath() +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD);
- Files.copy(resource.getTsFile(), mergeFile);
- tmpTargetResources.add(new TsFileResource(mergeFile));
- }
-
- File logFile =
- new File(SEQ_DIRS.getPath(),
CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- logWriter.write("seqFiles\n");
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(
- String.format(
- COMPACTION_TEST_SG + " 0 0 %s true\n",
tsFileResource.getTsFile().getName()));
- }
- logWriter.write("unseqFiles\n");
- for (TsFileResource tsFileResource : unseqResources) {
- logWriter.write(
- String.format(
- COMPACTION_TEST_SG + " 0 0 %s false\n",
tsFileResource.getTsFile().getName()));
- }
- logWriter.close();
-
- // First seq files lost
- seqResources.get(0).remove();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s1",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
unseqResources.get(i), false);
- }
- ModificationFile mergeMods =
- new ModificationFile(
- SEQ_DIRS + File.separator +
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- mergeMods.write(new Deletion(new PartialPath("root.d1.s1"), 100, 0, 100));
- mergeMods.close();
-
- TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0",
SEQ_DIRS.getPath());
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, logFile,
false)
- .doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
- for (TsFileResource resource : unseqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
- for (TsFileResource resource : tmpTargetResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
- for (TsFileResource seqResource :
-
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources))
{
- TsFileResource resource =
- new TsFileResource(
- new File(
- seqResource
- .getTsFilePath()
- .replace(
- IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
- TsFileConstant.TSFILE_SUFFIX)));
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
-
- Assert.assertFalse(logFile.exists());
- Assert.assertFalse(mergeMods.exists());
- }
-
- @Test
- public void testCompatibleWithAllSourceFilesExistWithFilePath() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- createFiles(3, 2, 3, 50, 0, 0, 50, 50, false, false);
- registerTimeseriesInMManger(2, 3, false);
- List<TsFileResource> tmpTargetResources = new ArrayList<>();
- for (TsFileResource resource : seqResources) {
- File mergeFile =
- new File(
- resource.getTsFilePath() +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD);
- mergeFile.createNewFile();
- tmpTargetResources.add(new TsFileResource(mergeFile));
- }
-
- File logFile =
- new File(SEQ_DIRS.getPath(),
CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- logWriter.write("seqFiles\n");
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(tsFileResource.getTsFile().getAbsolutePath());
- logWriter.write("\n");
- }
- logWriter.write("unseqFiles\n");
- for (TsFileResource tsFileResource : unseqResources) {
- logWriter.write(tsFileResource.getTsFile().getAbsolutePath());
- logWriter.write("\n");
- }
- logWriter.write(CompactionLogger.STR_MERGE_START_FROM_OLD);
- logWriter.write("\n");
- logWriter.write(seqResources.get(0).getTsFile().getAbsolutePath() + " " +
100);
- logWriter.close();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s1",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
unseqResources.get(i), false);
- }
- ModificationFile mergeMods =
- new ModificationFile(
- SEQ_DIRS + File.separator +
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- mergeMods.write(new Deletion(new PartialPath("root.d1.s1"), 100, 0, 100));
- mergeMods.close();
-
- TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0",
SEQ_DIRS.getPath());
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, logFile,
false)
- .doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
- for (TsFileResource resource : unseqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
- for (TsFileResource resource : tmpTargetResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
- for (TsFileResource resource :
-
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources))
{
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
-
- Assert.assertFalse(logFile.exists());
- Assert.assertFalse(mergeMods.exists());
- }
-
- @Test
- public void testCompatibleWithSomeSourceFilesLostWithFilePath() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- createFiles(3, 2, 3, 50, 0, 0, 50, 50, false, false);
- registerTimeseriesInMManger(2, 3, false);
- List<TsFileResource> tmpTargetResources = new ArrayList<>();
- for (TsFileResource resource : seqResources) {
- File mergeFile =
- new File(
- resource.getTsFilePath() +
IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD);
- Files.copy(resource.getTsFile(), mergeFile);
- tmpTargetResources.add(new TsFileResource(mergeFile));
- }
-
- File logFile =
- new File(SEQ_DIRS.getPath(),
CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- logWriter.write("seqFiles\n");
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(tsFileResource.getTsFile().getAbsolutePath());
- logWriter.write("\n");
- }
- logWriter.write("unseqFiles\n");
- for (TsFileResource tsFileResource : unseqResources) {
- logWriter.write(tsFileResource.getTsFile().getAbsolutePath());
- logWriter.write("\n");
- }
- logWriter.write(CompactionLogger.STR_MERGE_START_FROM_OLD);
- logWriter.write("\n");
- logWriter.write(seqResources.get(0).getTsFile().getAbsolutePath() + " " +
100);
- logWriter.close();
-
- // First seq files lost
- seqResources.get(0).remove();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
- for (int i = 0; i < unseqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s1",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
unseqResources.get(i), false);
- }
- ModificationFile mergeMods =
- new ModificationFile(
- SEQ_DIRS + File.separator +
IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- mergeMods.write(new Deletion(new PartialPath("root.d1.s1"), 100, 0, 100));
- mergeMods.close();
-
- TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0",
SEQ_DIRS.getPath());
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, logFile,
false)
- .doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
- for (TsFileResource resource : unseqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
- for (TsFileResource resource : tmpTargetResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- }
- for (TsFileResource seqResource :
-
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources))
{
- TsFileResource resource =
- new TsFileResource(
- new File(
- seqResource
- .getTsFilePath()
- .replace(
- IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
- TsFileConstant.TSFILE_SUFFIX)));
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
-
- Assert.assertFalse(logFile.exists());
- Assert.assertFalse(mergeMods.exists());
- }
-}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java
deleted file mode 100644
index 35793690f82..00000000000
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.compaction.recover;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverTask;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-/** This test is used to test the compatibility of compaction recovery with
0.12. */
-public class SizeTieredCompactionRecoverCompatibleTest extends
AbstractCompactionTest {
- @Override
- @Before
- public void setUp()
- throws IOException, MetadataException, WriteProcessException,
InterruptedException {
- super.setUp();
- }
-
- @Override
- @After
- public void tearDown() throws StorageEngineException, IOException {
- super.tearDown();
- }
-
- @Test
- public void testCompatibleWithAllSourceFilesExistWithFilePath() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- registerTimeseriesInMManger(2, 3, false);
- TsFileResource targetResource =
- TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources,
true);
- ICompactionPerformer performer = new
ReadChunkCompactionPerformer(seqResources, targetResource);
- performer.setSummary(new CompactionTaskSummary());
- performer.perform();
- RandomAccessFile targetFile = new
RandomAccessFile(targetResource.getTsFile(), "rw");
- long fileLength = targetFile.length();
- targetFile.getChannel().truncate(fileLength - 20);
- targetFile.close();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
-
- File logFile =
- new File(
- targetResource.getTsFile().getParent(),
- COMPACTION_TEST_SG +
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(
- String.format(
- "info-source\n%s 0 0 %s sequence\n",
- COMPACTION_TEST_SG, tsFileResource.getTsFile().getName()));
- }
- logWriter.write("sequence\n");
- logWriter.write(
- String.format(
- "info-target\n%s 0 0 %s sequence\n",
- COMPACTION_TEST_SG, targetResource.getTsFile().getName()));
- logWriter.close();
-
- TsFileManager tsFileManager =
- new TsFileManager(COMPACTION_TEST_SG, "0",
targetResource.getTsFile().getParent());
- tsFileManager.addAll(seqResources, true);
- CompactionRecoverTask recoverTask =
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager,
logFile, true);
- recoverTask.doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- Assert.assertTrue(resource.resourceFileExists());
- Assert.assertTrue(resource.getModFile().exists());
- }
-
- Assert.assertFalse(targetResource.getTsFile().exists());
- Assert.assertFalse(targetResource.resourceFileExists());
- Assert.assertFalse(targetResource.getModFile().exists());
- Assert.assertFalse(logFile.exists());
- }
-
- @Test
- public void testCompatibleWithSomeSourceFilesLostWithFilePath() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- registerTimeseriesInMManger(2, 3, false);
- TsFileResource targetResource =
- TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources,
true);
- ICompactionPerformer performer = new
ReadChunkCompactionPerformer(seqResources, targetResource);
- performer.setSummary(new CompactionTaskSummary());
- performer.perform();
- CompactionUtils.moveTargetFile(
- Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
-
- // first source file does not exist
- seqResources.get(0).remove();
-
- for (int i = 0; i < seqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
seqResources.get(i), false);
- }
-
- File logFile =
- new File(
- targetResource.getTsFile().getParent(),
- COMPACTION_TEST_SG +
CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD);
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(
- String.format(
- "info-source\n%s 0 0 %s sequence\n",
- COMPACTION_TEST_SG, tsFileResource.getTsFile().getName()));
- }
- logWriter.write("sequence\n");
- logWriter.write(
- String.format(
- "info-target\n%s 0 0 %s sequence\n",
- COMPACTION_TEST_SG, targetResource.getTsFile().getName()));
- logWriter.close();
-
- TsFileManager tsFileManager =
- new TsFileManager(COMPACTION_TEST_SG, "0",
targetResource.getTsFile().getParent());
- tsFileManager.addAll(seqResources, true);
- CompactionRecoverTask recoverTask =
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager,
logFile, true);
- recoverTask.doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
-
- Assert.assertTrue(targetResource.getTsFile().exists());
- Assert.assertTrue(targetResource.resourceFileExists());
- Assert.assertFalse(logFile.exists());
- }
-
- @Test
- public void testCompatibleWithAllSourceFilesExistWithFileInfo() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, true);
- registerTimeseriesInMManger(2, 3, false);
- TsFileResource targetResource =
- TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources,
true);
- ICompactionPerformer performer = new
ReadChunkCompactionPerformer(seqResources, targetResource);
- performer.setSummary(new CompactionTaskSummary());
- performer.perform();
- RandomAccessFile targetFile = new
RandomAccessFile(targetResource.getTsFile(), "rw");
- long fileLength = targetFile.length();
- targetFile.getChannel().truncate(fileLength - 20);
- targetFile.close();
-
- File logFile =
- new File(targetResource.getTsFile().getParent(), "root.compactionTest"
+ ".compaction.log");
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- for (TsFileResource tsFileResource : seqResources) {
- logWriter.write(String.format("source\n%s\n",
tsFileResource.getTsFile().getAbsolutePath()));
- }
- logWriter.write("sequence\n");
- logWriter.write(String.format("target\n%s\n",
targetResource.getTsFile().getAbsolutePath()));
- logWriter.close();
-
- TsFileManager tsFileManager =
- new TsFileManager("root.compactionTest", "0",
targetResource.getTsFile().getParent());
- tsFileManager.addAll(seqResources, true);
- CompactionRecoverTask recoverTask =
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager,
logFile, true);
- recoverTask.doCompaction();
-
- for (TsFileResource resource : seqResources) {
- Assert.assertTrue(resource.getTsFile().exists());
- }
-
- Assert.assertFalse(targetResource.getTsFile().exists());
- Assert.assertFalse(logFile.exists());
- }
-
- @Test
- public void testCompatibleWithSomeSourceFilesLostWithFileInfo() throws
Exception {
- createFiles(6, 2, 3, 100, 0, 0, 50, 50, false, false);
- registerTimeseriesInMManger(2, 3, false);
- TsFileResource targetResource =
-
TsFileNameGenerator.getInnerCompactionTargetFileResource(unseqResources, true);
- ICompactionPerformer performer = new
ReadChunkCompactionPerformer(seqResources, targetResource);
- performer.setSummary(new CompactionTaskSummary());
- performer.perform();
- CompactionUtils.moveTargetFile(
- Collections.singletonList(targetResource), true,
"root.compactionTest");
-
- // first source file does not exist
- unseqResources.get(0).remove();
-
- for (int i = 0; i < unseqResources.size(); i++) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- deleteMap.put(
- COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s0",
- new Pair(i * 10L, i * 10L + 10));
- CompactionFileGeneratorUtils.generateMods(deleteMap,
unseqResources.get(i), false);
- }
-
- File logFile =
- new File(targetResource.getTsFile().getParent(), "root.compactionTest"
+ ".compaction.log");
- BufferedWriter logWriter = new BufferedWriter(new FileWriter(logFile));
- for (TsFileResource tsFileResource : unseqResources) {
- logWriter.write(String.format("source\n%s\n",
tsFileResource.getTsFile().getAbsolutePath()));
- }
- logWriter.write("sequence\n");
- logWriter.write(String.format("target\n%s\n",
targetResource.getTsFile().getAbsolutePath()));
- logWriter.close();
-
- TsFileManager tsFileManager =
- new TsFileManager("root.compactionTest", "0",
targetResource.getTsFile().getParent());
- tsFileManager.addAll(unseqResources, false);
- CompactionRecoverTask recoverTask =
- new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager,
logFile, true);
- recoverTask.doCompaction();
-
- for (TsFileResource resource : unseqResources) {
- Assert.assertFalse(resource.getTsFile().exists());
- Assert.assertFalse(resource.resourceFileExists());
- Assert.assertFalse(resource.getModFile().exists());
- }
-
- Assert.assertTrue(targetResource.getTsFile().exists());
- Assert.assertTrue(targetResource.resourceFileExists());
- Assert.assertFalse(logFile.exists());
- }
-}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
index 3d23d55a511..eb419e9279b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.recover;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -30,9 +31,11 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
@@ -197,11 +200,8 @@ public class SizeTieredCompactionRecoverTest {
/**
* Test a compaction task in finished. The compaction log use file info to
record files. The
* sources file are all existed.
- *
- * @throws Exception
*/
- @Test
- public void testRecoverWithCompleteTargetFileUsingFileInfo() throws
Exception {
+ private List<TsFileResource> getSourceFiles() throws IllegalPathException,
IOException {
List<TsFileResource> sourceFiles = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
@@ -226,11 +226,17 @@ public class SizeTieredCompactionRecoverTest {
fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
sourceFiles.add(tsFileResource);
}
+ return sourceFiles;
+ }
+
+ @Test
+ public void testRecoverWithCompleteTargetFileUsingFileInfo() throws
Exception {
+ List<TsFileResource> sourceFiles = getSourceFiles();
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES);
- compactionLogger.logFiles(Collections.singletonList(targetResource),
STR_TARGET_FILES);
+ compactionLogger.logFile(targetResource, STR_TARGET_FILES);
compactionLogger.close();
performer.setSourceFiles(sourceFiles);
performer.setTargetFiles(Collections.singletonList(targetResource));
@@ -238,6 +244,7 @@ public class SizeTieredCompactionRecoverTest {
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+
CompactionRecoverTask recoverTask =
new CompactionRecoverTask(
COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath),
true);
@@ -249,6 +256,31 @@ public class SizeTieredCompactionRecoverTest {
Assert.assertFalse(targetResource.getTsFile().exists());
}
+ @Test
+ public void testInnerRecoverWithCompleteTargetFileUsingFileInfo() throws
Exception {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(COMPACTION_TEST_SG, "0", tsFileManager,
new File(logFilePath));
+ innerSpaceCompactionTask.recover();
+ for (TsFileResource resource : sourceFiles) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ }
+ Assert.assertFalse(targetResource.getTsFile().exists());
+ }
+
/**
* Test a compaction task in not finished. The compaction log use file info
to record files.
*
@@ -256,30 +288,7 @@ public class SizeTieredCompactionRecoverTest {
*/
@Test
public void testRecoverWithIncompleteTargetFileUsingFileInfo() throws
Exception {
- List<TsFileResource> sourceFiles = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- }
+ List<TsFileResource> sourceFiles = getSourceFiles();
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -307,6 +316,35 @@ public class SizeTieredCompactionRecoverTest {
Assert.assertFalse(targetResource.getTsFile().exists());
}
+ @Test
+ public void testInnerRecoverWithIncompleteTargetFileUsingFileInfo() throws
Exception {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileOutputStream targetStream = new
FileOutputStream(targetResource.getTsFile(), true);
+ FileChannel channel = targetStream.getChannel();
+ channel.truncate(targetResource.getTsFile().length() - 100);
+ channel.close();
+ InnerSpaceCompactionTask recoverTask =
+ new InnerSpaceCompactionTask(COMPACTION_TEST_SG, "0", tsFileManager,
new File(logFilePath));
+ recoverTask.recover();
+ // all the source file should be deleted
+ for (TsFileResource resource : sourceFiles) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ }
+ Assert.assertFalse(targetResource.getTsFile().exists());
+ }
+
/**
* Test a compaction task in finished. The compaction log use file path to
record files. All the
* sources file is still existed.
@@ -315,30 +353,7 @@ public class SizeTieredCompactionRecoverTest {
*/
@Test
public void testRecoverWithCompleteTargetFileUsingFilePath() throws
Exception {
- List<TsFileResource> sourceFiles = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- }
+ List<TsFileResource> sourceFiles = getSourceFiles();
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
CompactionLogger logger = new CompactionLogger(new File(logFilePath));
@@ -362,6 +377,31 @@ public class SizeTieredCompactionRecoverTest {
Assert.assertFalse(targetResource.getTsFile().exists());
}
+ @Test
+ public void testInnerRecoverWithCompleteTargetFileUsingFilePath() throws
Exception {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
+ SimpleCompactionLogger logger = new SimpleCompactionLogger(new
File(logFilePath));
+ logger.logSourceFiles(sourceFiles);
+ logger.logTargetFile(targetResource);
+ logger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(COMPACTION_TEST_SG, "0", tsFileManager,
new File(logFilePath));
+ innerSpaceCompactionTask.recover();
+ // all the source file should still exist
+ for (TsFileResource resource : sourceFiles) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ }
+ Assert.assertFalse(targetResource.getTsFile().exists());
+ }
+
/**
* Test a compaction task in not finished. The compaction log use file path
to record files.
*
@@ -369,30 +409,7 @@ public class SizeTieredCompactionRecoverTest {
*/
@Test
public void testRecoverWithIncompleteTargetFileUsingFilePath() throws
Exception {
- List<TsFileResource> sourceFiles = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- }
+ List<TsFileResource> sourceFiles = getSourceFiles();
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -420,6 +437,35 @@ public class SizeTieredCompactionRecoverTest {
Assert.assertFalse(targetResource.getTsFile().exists());
}
+ @Test
+ public void testInnerRecoverWithIncompleteTargetFileUsingFilePath() throws
Exception {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ TsFileResource targetResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles,
true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileOutputStream targetStream = new
FileOutputStream(targetResource.getTsFile(), true);
+ FileChannel channel = targetStream.getChannel();
+ channel.truncate(targetResource.getTsFile().length() - 100);
+ channel.close();
+ InnerSpaceCompactionTask innerSpaceCompactionTask =
+ new InnerSpaceCompactionTask(COMPACTION_TEST_SG, "0", tsFileManager,
new File(logFilePath));
+ innerSpaceCompactionTask.recover();
+ // all the source file should be deleted
+ for (TsFileResource resource : sourceFiles) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ }
+ Assert.assertFalse(targetResource.getTsFile().exists());
+ }
+
/**
* Test a compaction task is finished, and the data dirs of the system is
change. The compaction
* log use file info to record files.
@@ -429,32 +475,9 @@ public class SizeTieredCompactionRecoverTest {
@Test
public void
testRecoverWithCompleteTargetFileUsingFileInfoAndChangingDataDirs() throws
Exception {
try {
- List<TsFileResource> sourceFiles = new ArrayList<>();
+ List<TsFileResource> sourceFiles = getSourceFiles();
List<String> sourceFileNames = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- sourceFileNames.add(tsFileResource.getTsFile().getName());
- }
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -467,7 +490,6 @@ public class SizeTieredCompactionRecoverTest {
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
- long sizeOfTargetFile = targetResource.getTsFileSize();
FileUtils.moveDirectory(
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
@@ -519,6 +541,76 @@ public class SizeTieredCompactionRecoverTest {
}
}
+ @Test
+ public void
testInnerRecoverWithCompleteTargetFileUsingFileInfoAndChangingDataDirs()
+ throws Exception {
+ try {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ List<String> sourceFileNames = new ArrayList<>();
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
+ TsFileResource targetResource =
+
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileUtils.moveDirectory(
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
+ setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"}});
+ InnerSpaceCompactionTask recoverTask =
+ new InnerSpaceCompactionTask(
+ COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath));
+ recoverTask.recover();
+ // all the source files should exist
+ for (String sourceFileName : sourceFileNames) {
+ Assert.assertTrue(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ sourceFileName)
+ .exists());
+ }
+ File targetFileAfterMoved =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ targetResource
+ .getTsFile()
+ .getName()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX));
+ Assert.assertFalse(targetFileAfterMoved.exists());
+ } finally {
+ FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"));
+ }
+ }
+
/**
* Test a compaction task is not finished, and the data dirs of the system
is change The
* compaction log use file info to record files.
@@ -529,32 +621,9 @@ public class SizeTieredCompactionRecoverTest {
public void
testRecoverWithIncompleteTargetFileUsingFileInfoAndChangingDataDirs()
throws Exception {
try {
- List<TsFileResource> sourceFiles = new ArrayList<>();
+ List<TsFileResource> sourceFiles = getSourceFiles();
List<String> sourceFileNames = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- sourceFileNames.add(tsFileResource.getTsFile().getName());
- }
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -617,6 +686,75 @@ public class SizeTieredCompactionRecoverTest {
}
}
+ @Test
+ public void
testInnerRecoverWithIncompleteTargetFileUsingFileInfoAndChangingDataDirs()
+ throws Exception {
+ try {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ List<String> sourceFileNames = new ArrayList<>();
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
+ TsFileResource targetResource =
+
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileOutputStream targetStream = new
FileOutputStream(targetResource.getTsFile(), true);
+ FileChannel channel = targetStream.getChannel();
+ channel.truncate(targetResource.getTsFile().length() - 100);
+ channel.close();
+ FileUtils.moveDirectory(
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
+ setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"}});
+ InnerSpaceCompactionTask recoverTask =
+ new InnerSpaceCompactionTask(
+ COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath));
+ recoverTask.recover();
+ // all the source file should be deleted
+ for (String sourceFileName : sourceFileNames) {
+ Assert.assertTrue(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ sourceFileName)
+ .exists());
+ }
+ Assert.assertFalse(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ targetResource.getTsFile().getName())
+ .exists());
+ } finally {
+ FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"));
+ }
+ }
+
/**
* Test a compaction task is finished, and the data dirs of the system is
change. The compaction
* log use file path to record files.
@@ -626,32 +764,9 @@ public class SizeTieredCompactionRecoverTest {
@Test
public void
testRecoverWithCompleteTargetFileUsingFilePathAndChangingDataDirs() throws
Exception {
try {
- List<TsFileResource> sourceFiles = new ArrayList<>();
+ List<TsFileResource> sourceFiles = getSourceFiles();
List<String> sourceFileNames = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- sourceFileNames.add(tsFileResource.getTsFile().getName());
- }
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -664,7 +779,6 @@ public class SizeTieredCompactionRecoverTest {
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
- long sizeOfTargetFile = targetResource.getTsFileSize();
FileUtils.moveDirectory(
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
@@ -716,6 +830,76 @@ public class SizeTieredCompactionRecoverTest {
}
}
+ @Test
+ public void
testInnerRecoverWithCompleteTargetFileUsingFilePathAndChangingDataDirs()
+ throws Exception {
+ try {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ List<String> sourceFileNames = new ArrayList<>();
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
+ TsFileResource targetResource =
+
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileUtils.moveDirectory(
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
+ setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"}});
+ InnerSpaceCompactionTask recoverTask =
+ new InnerSpaceCompactionTask(
+ COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath));
+ recoverTask.recover();
+ // all the source files should exist
+ for (String sourceFileName : sourceFileNames) {
+ Assert.assertTrue(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ sourceFileName)
+ .exists());
+ }
+ File targetFileAfterMoved =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ targetResource
+ .getTsFile()
+ .getName()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX));
+ Assert.assertFalse(targetFileAfterMoved.exists());
+ } finally {
+ FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"));
+ }
+ }
+
/**
* Test a compaction task is not finished, and the data dirs of the system
is change. The
* compaction log use file path to record files.
@@ -726,32 +910,9 @@ public class SizeTieredCompactionRecoverTest {
public void
testRecoverWithIncompleteTargetFileUsingFilePathAndChangingDataDirs()
throws Exception {
try {
- List<TsFileResource> sourceFiles = new ArrayList<>();
+ List<TsFileResource> sourceFiles = getSourceFiles();
List<String> sourceFileNames = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- new TsFileResource(
- new File(
- SEQ_FILE_DIR
- + File.separator.concat(
- i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + i
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- sourceFiles.add(tsFileResource);
- sourceFileNames.add(tsFileResource.getTsFile().getName());
- }
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
TsFileResource targetResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
CompactionLogger compactionLogger = new CompactionLogger(new
File(logFilePath));
@@ -813,4 +974,73 @@ public class SizeTieredCompactionRecoverTest {
FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"));
}
}
+
+ @Test
+ public void
testInnerRecoverWithIncompleteTargetFileUsingFilePathAndChangingDataDirs()
+ throws Exception {
+ try {
+ List<TsFileResource> sourceFiles = getSourceFiles();
+ List<String> sourceFileNames = new ArrayList<>();
+ sourceFiles.forEach(f -> sourceFileNames.add(f.getTsFile().getName()));
+ TsFileResource targetResource =
+
TsFileNameGenerator.getInnerCompactionTargetFileResource(sourceFiles, true);
+ SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(new
File(logFilePath));
+ compactionLogger.logSourceFiles(sourceFiles);
+ compactionLogger.logTargetFile(targetResource);
+ compactionLogger.close();
+ performer.setSourceFiles(sourceFiles);
+ performer.setTargetFiles(Collections.singletonList(targetResource));
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(
+ Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
+ FileOutputStream targetStream = new
FileOutputStream(targetResource.getTsFile(), true);
+ FileChannel channel = targetStream.getChannel();
+ channel.truncate(targetResource.getTsFile().length() - 100);
+ channel.close();
+ FileUtils.moveDirectory(
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
+ new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
+ setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"}});
+ InnerSpaceCompactionTask recoverTask =
+ new InnerSpaceCompactionTask(
+ COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath));
+ recoverTask.recover();
+ // all the source file should be deleted
+ for (String sourceFileName : sourceFileNames) {
+ Assert.assertTrue(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ sourceFileName)
+ .exists());
+ }
+ Assert.assertFalse(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ + File.separator
+ + "data1"
+ + File.separator
+ + "sequence"
+ + File.separator
+ + COMPACTION_TEST_SG
+ + File.separator
+ + "0"
+ + File.separator
+ + "0",
+ targetResource.getTsFile().getName())
+ .exists());
+ } finally {
+ FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH +
File.separator + "data1"));
+ }
+ }
}