This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new acca766 [To rel/0.12][IOTDB-2603]Fix compaction recover (#5113)
acca766 is described below
commit acca76652f2d841fad68ec8e6fd427c94c2f5b3a
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Feb 25 09:37:32 2022 +0800
[To rel/0.12][IOTDB-2603]Fix compaction recover (#5113)
* add handle when some source files lost
* fix recover
* remove useless code in merge recover
* spotless
* serialize target file in recover
* update memory when some source files lost
---
.../resources/conf/iotdb-engine.properties | 5 -
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 -
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 -
.../db/engine/compaction/TsFileManagement.java | 19 +-
.../db/engine/merge/recover/MergeLogAnalyzer.java | 23 +-
.../merge/task/CompactionMergeRecoverTask.java | 12 +-
.../db/engine/merge/task/RecoverMergeTask.java | 86 +++-
.../engine/storagegroup/StorageGroupProcessor.java | 2 -
.../iotdb/db/engine/merge/MergeRecoverTest.java | 436 +++++++++++++++++++++
.../apache/iotdb/db/engine/merge/MergeTest.java | 2 +-
10 files changed, 546 insertions(+), 47 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index edc183f..fef0dec 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -451,11 +451,6 @@ timestamp_precision=ms
# If you are feeling the rebooting is too slow, set this to false, false by
default
# continue_merge_after_reboot=false
-# When set to true, all unseq merges becomes full merge (the whole SeqFiles
are re-written despite how
-# much they are overflowed). This may increase merge overhead depending on how
much the SeqFiles
-# are overflowed.
-# force_full_merge=true
-
# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
# compaction_thread_num=10
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d25ac44..cc46dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1463,10 +1463,6 @@ public class IoTDBConfig {
return forceFullMerge;
}
- void setForceFullMerge(boolean forceFullMerge) {
- this.forceFullMerge = forceFullMerge;
- }
-
public int getCompactionThreadNum() {
return compactionThreadNum;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index fe4d760..0ff011f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -511,10 +511,6 @@ public class IoTDBDescriptor {
Long.parseLong(
properties.getProperty(
"compaction_interval",
Long.toString(conf.getCompactionInterval()))));
- conf.setForceFullMerge(
- Boolean.parseBoolean(
- properties.getProperty(
- "force_full_merge",
Boolean.toString(conf.isForceFullMerge()))));
conf.setCompactionThreadNum(
Integer.parseInt(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index ccfb35d..3e0f2c8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -82,8 +82,6 @@ public abstract class TsFileManagement {
protected boolean isMergeExecutedInCurrentTask = false;
protected boolean isForceFullMerge =
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
- private final int maxOpenFileNumInEachUnseqCompaction =
-
IoTDBDescriptor.getInstance().getConfig().getMaxSelectUnseqFileNumInEachUnseqCompaction();
protected ReentrantLock compactionSelectionLock = new ReentrantLock();
@@ -336,6 +334,19 @@ public abstract class TsFileManagement {
seqFile.writeUnlock();
}
+ public void replace(
+ List<TsFileResource> seqResources,
+ List<TsFileResource> unseqResources,
+ List<TsFileResource> targetResources,
+ boolean isTargetSeq)
+ throws IOException {
+ writeLock();
+ removeAll(seqResources, true);
+ removeAll(unseqResources, false);
+ addAll(targetResources, isTargetSeq);
+ writeUnlock();
+ }
+
private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
writeLock();
try {
@@ -360,7 +371,7 @@ public abstract class TsFileManagement {
}
@SuppressWarnings("squid:S1141")
- private void updateMergeModification(TsFileResource seqFile) {
+ public void updateMergeModification(TsFileResource seqFile) {
try {
// remove old modifications and write modifications generated during
merge
seqFile.removeModFile();
@@ -387,7 +398,7 @@ public abstract class TsFileManagement {
}
}
- private void removeMergingModification() {
+ public void removeMergingModification() {
try {
if (mergingModification != null) {
mergingModification.remove();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
index 780eead..1203964 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
@@ -75,8 +75,8 @@ public class MergeLogAnalyzer {
try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(logFile))) {
currLine = bufferedReader.readLine();
if (currLine != null) {
+ status = Status.All_SOURCE_FILES_EXIST;
analyzeSeqFiles(bufferedReader);
-
analyzeUnseqFiles(bufferedReader);
}
}
@@ -103,7 +103,9 @@ public class MergeLogAnalyzer {
mergeSeqFiles.add(seqFile);
// remove to speed-up next iteration
iterator.remove();
- currentFileFound = true;
+ if (seqFile.getTsFile().exists()) {
+ currentFileFound = true;
+ }
break;
}
}
@@ -119,7 +121,7 @@ public class MergeLogAnalyzer {
(System.currentTimeMillis() - startTime));
}
if (!allSourceFileExists) {
- status = Status.MERGE_END;
+ status = Status.SOME_SOURCE_FILES_LOST;
}
resource.setSeqFiles(mergeSeqFiles);
}
@@ -128,7 +130,6 @@ public class MergeLogAnalyzer {
if (!STR_UNSEQ_FILES.equals(currLine)) {
return;
}
- status = Status.MERGE_START;
long startTime = System.currentTimeMillis();
List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
boolean allSourceFileExists = true;
@@ -142,7 +143,9 @@ public class MergeLogAnalyzer {
mergeUnseqFiles.add(unseqFile);
// remove to speed-up next iteration
iterator.remove();
- currentFileFound = true;
+ if (unseqFile.getTsFile().exists()) {
+ currentFileFound = true;
+ }
break;
}
}
@@ -158,7 +161,7 @@ public class MergeLogAnalyzer {
(System.currentTimeMillis() - startTime));
}
if (!allSourceFileExists) {
- status = Status.MERGE_END;
+ status = Status.SOME_SOURCE_FILES_LOST;
}
resource.setUnseqFiles(mergeUnseqFiles);
}
@@ -166,9 +169,9 @@ public class MergeLogAnalyzer {
public enum Status {
// almost nothing has been done
NONE,
- // at least the files and timeseries to be merged are known
- MERGE_START,
- // all the merge files are merged with the origin files and the task is
almost done
- MERGE_END
+ // all source files exist
+ All_SOURCE_FILES_EXIST,
+ // some source files lost
+ SOME_SOURCE_FILES_LOST
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
index 27e038f..5771a7a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -21,14 +21,12 @@ package org.apache.iotdb.db.engine.merge.task;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
public class CompactionMergeRecoverTask implements Runnable {
@@ -41,8 +39,6 @@ public class CompactionMergeRecoverTask implements Runnable {
public CompactionMergeRecoverTask(
TsFileManagement tsFileManagement,
- List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles,
String storageGroupSysDir,
MergeCallback callback,
String taskName,
@@ -54,13 +50,7 @@ public class CompactionMergeRecoverTask implements Runnable {
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.recoverMergeTask =
new RecoverMergeTask(
- seqFiles,
- unseqFiles,
- storageGroupSysDir,
- callback,
- taskName,
- fullMerge,
- storageGroupName);
+ tsFileManagement, storageGroupSysDir, callback, taskName,
fullMerge, storageGroupName);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index a23ee56..5965cb3 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -19,19 +19,26 @@
package org.apache.iotdb.db.engine.merge.task;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer;
import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer.Status;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;
+
/**
* RecoverMergeTask is an extension of MergeTask, which resumes the last merge
progress by scanning
* merge.log using LogAnalyzer and continue the unfinished merge.
@@ -41,17 +48,24 @@ public class RecoverMergeTask extends MergeTask {
private static final Logger logger =
LoggerFactory.getLogger(RecoverMergeTask.class);
private MergeLogAnalyzer analyzer;
+ private TsFileManagement tsFileManagement;
public RecoverMergeTask(
- List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles,
+ TsFileManagement tsfileManagement,
String storageGroupSysDir,
MergeCallback callback,
String taskName,
boolean fullMerge,
String storageGroupName) {
super(
- seqFiles, unseqFiles, storageGroupSysDir, callback, taskName,
fullMerge, storageGroupName);
+ tsfileManagement.getTsFileList(true),
+ tsfileManagement.getTsFileList(false),
+ storageGroupSysDir,
+ callback,
+ taskName,
+ fullMerge,
+ storageGroupName);
+ this.tsFileManagement = tsfileManagement;
}
public void recoverMerge() throws IOException, MetadataException {
@@ -68,19 +82,79 @@ public class RecoverMergeTask extends MergeTask {
case NONE:
logFile.delete();
break;
- case MERGE_START:
- resumeAfterFilesLogged();
+ case All_SOURCE_FILES_EXIST:
+ handleWhenAllSourceFilesExist();
+ break;
+ case SOME_SOURCE_FILES_LOST:
+ handleWhenSomeSourceFilesLost();
break;
default:
throw new UnsupportedOperationException(taskName + " found
unrecognized status " + status);
}
+ if (logFile.exists()) {
+ logFile.delete();
+ }
if (logger.isInfoEnabled()) {
logger.info(
"{} merge recovery ends after {}ms", taskName,
(System.currentTimeMillis() - startTime));
}
}
- private void resumeAfterFilesLogged() throws IOException {
+ /** Delete .merge file and merging mods file. */
+ private void handleWhenAllSourceFilesExist() throws IOException {
cleanUp(false);
+ tsFileManagement.removeMergingModification();
+ }
+
+ /**
+ * 1. If target file does not exist, then move .merge file to target file
and serialize target
+ * resource file. <br>
+ * 2. Append merging modification to target mods file and delete merging
mods file. <br>
+ * 3. Delete source files and .merge file. <br>
+ * 4. Update resource memory of tsfileManagement. <br>
+ */
+ private void handleWhenSomeSourceFilesLost() throws IOException {
+ List<TsFileResource> targetResouces = new ArrayList<>();
+ for (TsFileResource sourceSeqResource : resource.getSeqFiles()) {
+ File targetFile =
modifyTsFileNameUnseqMergCnt(sourceSeqResource.getTsFile());
+ TsFileResource targetTsFileResouce = new TsFileResource(targetFile);
+ // move to target file and serialize resource file
+ if (!targetFile.exists()) {
+ // move target file
+ File tmpTargetFile = new File(sourceSeqResource.getTsFilePath() +
MERGE_SUFFIX);
+ FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, targetFile);
+
+ // serialize target resource file
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(targetFile.getAbsolutePath())) {
+ FileLoaderUtils.updateTsFileResource(reader, targetTsFileResouce);
+ }
+ targetTsFileResouce.serialize();
+ targetResouces.add(targetTsFileResouce);
+ }
+
+ // write merging modifications to new mods file
+ tsFileManagement.updateMergeModification(targetTsFileResouce);
+
+ // delete source seq file
+ sourceSeqResource.remove();
+
+ // delete merge file
+ File mergeFile = new File(sourceSeqResource.getTsFilePath() +
MERGE_SUFFIX);
+ if (mergeFile.exists()) {
+ mergeFile.delete();
+ }
+ }
+
+ // update memory
+ tsFileManagement.replace(
+ resource.getSeqFiles(), resource.getUnseqFiles(), targetResouces,
true);
+
+ // delete unseq source files
+ for (TsFileResource unseqResource : resource.getUnseqFiles()) {
+ unseqResource.remove();
+ }
+
+ // delete merging mods file
+ tsFileManagement.removeMergingModification();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 0df5743..971592c 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -517,8 +517,6 @@ public class StorageGroupProcessor {
CompactionMergeRecoverTask recoverTask =
new CompactionMergeRecoverTask(
tsFileManagement,
- new ArrayList<>(tsFileManagement.getTsFileList(true)),
- tsFileManagement.getTsFileList(false),
storageGroupSysDir.getPath(),
tsFileManagement::mergeEndAction,
taskName,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java
new file mode 100644
index 0000000..21fac51
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.merge;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
+import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;
+
+public class MergeRecoverTest extends MergeTest {
+ private List<TsFileResource> sourceSeqFiles = new ArrayList<>();
+ private List<TsFileResource> sourceUnseqFiles = new ArrayList<>();
+ private List<TsFileResource> tmpSourceSeqFiles = new ArrayList<>();
+ private List<TsFileResource> tmpSourceUnseqFiles = new ArrayList<>();
+ private File logFile = new File(TestConstant.SEQUENCE_DATA_DIR,
MergeLogger.MERGE_LOG_NAME);
+ private final int seqFileNum = 10;
+ private final int unseqFileNum = 5;
+ private final File seqDataDir = new File(TestConstant.SEQUENCE_DATA_DIR);
+ private final File unseqDataDir = new File(TestConstant.UNSEQUENCE_DATA_DIR);
+ private final ModificationFile mergingModsFile =
+ new ModificationFile(
+ TestConstant.SEQUENCE_DATA_DIR + File.separator +
MERGING_MODIFICATION_FILE_NAME);
+ private TsFileManagement tsFileManagement =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCompactionStrategy()
+ .getTsFileManagement("root.sg1", "0",
TestConstant.SEQUENCE_DATA_DIR);
+
+ @Before
+ public void setUp() throws IOException, MetadataException,
WriteProcessException {
+ IoTDB.metaManager.init();
+ Assert.assertTrue(seqDataDir.mkdirs());
+ Assert.assertTrue(unseqDataDir.mkdirs());
+ prepareSeries();
+ createFiles();
+ tsFileManagement.addAll(sourceSeqFiles, true);
+ tsFileManagement.addAll(sourceUnseqFiles, false);
+ tsFileManagement.mergingModification = mergingModsFile;
+ MergeManager.getINSTANCE().start();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ IoTDB.metaManager.clear();
+ EnvironmentUtils.cleanAllDir();
+ MergeManager.getINSTANCE().stop();
+ deleteFiles();
+ }
+
+ /**
+ * source seq file index: 1~10<br>
+ * source unseq file index: 11~15<br>
+ * deleted file: 1.tsfile <br>
+ * no target file exist
+ */
+ @Test
+ public void testRecoverWithSomeSourceFilesLost() throws IOException,
MetadataException {
+ sourceSeqFiles.get(0).getTsFile().delete();
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ tsFileManagement,
+ TestConstant.SEQUENCE_DATA_DIR,
+ tsFileManagement::mergeEndAction,
+ "recoverTest",
+ true,
+ "root.sg1");
+ recoverMergeTask.recoverMerge();
+ for (TsFileResource seqResource : tmpSourceSeqFiles) {
+ Assert.assertFalse(seqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(seqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX).exists());
+ Assert.assertFalse(seqResource.getModFile().exists());
+ File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+ Assert.assertTrue(targetFile.exists());
+ Assert.assertTrue(new File(targetFile.getPath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ ModificationFile targetModsFile =
+ new ModificationFile(targetFile.getPath() +
ModificationFile.FILE_SUFFIX);
+ Assert.assertTrue(targetModsFile.exists());
+ Assert.assertEquals(2, targetModsFile.getModifications().size());
+ }
+ for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+ Assert.assertFalse(unseqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(unseqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(unseqResource.getModFile().exists());
+ }
+ Assert.assertFalse(mergingModsFile.exists());
+ Assert.assertFalse(logFile.exists());
+ }
+
+ /**
+ * source seq file index: 1~10 <br>
+ * source unseq file index: 11~15 <br>
+ * deleted file: 1.tsfile, 1.tsfile.resource, 2.tsfile <br>
+ * existed target file: 1.tsfile, 1.tsfile.resource, 2.tsfile
+ */
+ @Test
+ public void testRecoverWithSomeSourceFilesLostAndSomeTargetFilesExist()
+ throws IOException, MetadataException {
+ File targetFile =
modifyTsFileNameUnseqMergCnt(sourceSeqFiles.get(0).getTsFile());
+
FSFactoryProducer.getFSFactory().moveFile(sourceSeqFiles.get(0).getTsFile(),
targetFile);
+ FSFactoryProducer.getFSFactory()
+ .moveFile(
+ new File(sourceSeqFiles.get(0).getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX),
+ new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+ targetFile =
modifyTsFileNameUnseqMergCnt(sourceSeqFiles.get(1).getTsFile());
+
FSFactoryProducer.getFSFactory().moveFile(sourceSeqFiles.get(1).getTsFile(),
targetFile);
+ // sg recover will serialize resouce file
+ TsFileResource targetTsFileResouce = new TsFileResource(targetFile);
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(targetFile.getAbsolutePath())) {
+ FileLoaderUtils.updateTsFileResource(reader, targetTsFileResouce);
+ }
+ targetTsFileResouce.serialize();
+
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ tsFileManagement,
+ TestConstant.SEQUENCE_DATA_DIR,
+ tsFileManagement::mergeEndAction,
+ "recoverTest",
+ true,
+ "root.sg1");
+ recoverMergeTask.recoverMerge();
+ for (TsFileResource seqResource : tmpSourceSeqFiles) {
+ Assert.assertFalse(seqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(seqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX).exists());
+ targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+ Assert.assertTrue(targetFile.exists());
+ Assert.assertTrue(new File(targetFile.getPath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(seqResource.getModFile().exists());
+ ModificationFile targetModsFile =
+ new ModificationFile(targetFile.getPath() +
ModificationFile.FILE_SUFFIX);
+ Assert.assertTrue(targetModsFile.exists());
+ Assert.assertEquals(2, targetModsFile.getModifications().size());
+ }
+ for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+ Assert.assertFalse(unseqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(unseqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(unseqResource.getModFile().exists());
+ }
+ Assert.assertFalse(mergingModsFile.exists());
+ Assert.assertFalse(logFile.exists());
+ }
+
+ @Test
+ public void testRecoverWithAllSeqSourceFilesLost() throws IOException,
MetadataException {
+ for (TsFileResource resource : sourceSeqFiles) {
+ File targetFile = modifyTsFileNameUnseqMergCnt(resource.getTsFile());
+ FSFactoryProducer.getFSFactory()
+ .moveFile(new File(resource.getTsFilePath() +
MergeTask.MERGE_SUFFIX), targetFile);
+ FSFactoryProducer.getFSFactory()
+ .moveFile(
+ new File(resource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX),
+ new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+ resource.remove();
+ }
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ tsFileManagement,
+ TestConstant.SEQUENCE_DATA_DIR,
+ tsFileManagement::mergeEndAction,
+ "recoverTest",
+ true,
+ "root.sg1");
+ recoverMergeTask.recoverMerge();
+ for (TsFileResource seqResource : tmpSourceSeqFiles) {
+ Assert.assertFalse(seqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(seqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX).exists());
+ File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+ Assert.assertTrue(targetFile.exists());
+ Assert.assertTrue(new File(targetFile.getPath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(seqResource.getModFile().exists());
+ ModificationFile targetModsFile =
+ new ModificationFile(targetFile.getPath() +
ModificationFile.FILE_SUFFIX);
+ Assert.assertTrue(targetModsFile.exists());
+ Assert.assertEquals(2, targetModsFile.getModifications().size());
+ }
+ for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+ Assert.assertFalse(unseqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(unseqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(unseqResource.getModFile().exists());
+ }
+ Assert.assertFalse(mergingModsFile.exists());
+ Assert.assertFalse(logFile.exists());
+ }
+
+ @Test
+ public void testRecoverWithAllSourceFilesLost() throws IOException,
MetadataException {
+ for (TsFileResource resource : sourceSeqFiles) {
+ File targetFile = modifyTsFileNameUnseqMergCnt(resource.getTsFile());
+ FSFactoryProducer.getFSFactory()
+ .moveFile(new File(resource.getTsFilePath() +
MergeTask.MERGE_SUFFIX), targetFile);
+ FSFactoryProducer.getFSFactory()
+ .moveFile(
+ new File(resource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX),
+ new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+ resource.remove();
+ }
+ for (TsFileResource resource : sourceUnseqFiles) {
+ resource.remove();
+ }
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ tsFileManagement,
+ TestConstant.SEQUENCE_DATA_DIR,
+ tsFileManagement::mergeEndAction,
+ "recoverTest",
+ true,
+ "root.sg1");
+ recoverMergeTask.recoverMerge();
+ for (TsFileResource seqResource : tmpSourceSeqFiles) {
+ Assert.assertFalse(seqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(seqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX).exists());
+ File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+ Assert.assertTrue(targetFile.exists());
+ Assert.assertTrue(new File(targetFile.getPath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(seqResource.getModFile().exists());
+ ModificationFile targetModsFile =
+ new ModificationFile(targetFile.getPath() +
ModificationFile.FILE_SUFFIX);
+ Assert.assertTrue(targetModsFile.exists());
+ Assert.assertEquals(2, targetModsFile.getModifications().size());
+ }
+ for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+ Assert.assertFalse(unseqResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(unseqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(unseqResource.getModFile().exists());
+ }
+ Assert.assertFalse(mergingModsFile.exists());
+ Assert.assertFalse(logFile.exists());
+ }
+
+ @Test
+ public void testRecoverWithAllSourceFilesExist() throws IOException,
MetadataException {
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ tsFileManagement,
+ TestConstant.SEQUENCE_DATA_DIR,
+ tsFileManagement::mergeEndAction,
+ "recoverTest",
+ true,
+ "root.sg1");
+ recoverMergeTask.recoverMerge();
+ for (TsFileResource seqResource : tmpSourceSeqFiles) {
+ Assert.assertTrue(seqResource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(seqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX).exists());
+ Assert.assertTrue(seqResource.getModFile().exists());
+ File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+ Assert.assertFalse(targetFile.exists());
+ Assert.assertFalse(new File(targetFile.getPath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ ModificationFile targetModsFile =
+ new ModificationFile(targetFile.getPath() +
ModificationFile.FILE_SUFFIX);
+ Assert.assertFalse(targetModsFile.exists());
+ }
+ for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+ Assert.assertTrue(unseqResource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(unseqResource.getTsFilePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(unseqResource.getModFile().exists());
+ }
+ Assert.assertFalse(mergingModsFile.exists());
+ Assert.assertFalse(logFile.exists());
+ }
+
+ private void createFiles() throws IOException, IllegalPathException,
WriteProcessException {
+ // create source seq files
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ Assert.assertTrue(file.createNewFile());
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.setMinPlanIndex(i);
+ tsFileResource.setMaxPlanIndex(i);
+ tsFileResource.setVersion(i);
+ tsFileResource.serialize();
+ sourceSeqFiles.add(tsFileResource);
+ tmpSourceSeqFiles.add(new TsFileResource(file));
+ // create mods file
+ Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"),
1, 0, 100);
+ tsFileResource.getModFile().write(deletion);
+ deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 1, 200,
300);
+ tsFileResource.getModFile().write(deletion);
+ tsFileResource.getModFile().close();
+
+ prepareFile(tsFileResource, i * 10, 10, i * 10);
+ }
+
+ // create source unseq files
+ for (int i = seqFileNum; i < seqFileNum + unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.UNSEQUENCE_DATA_DIR.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ Assert.assertTrue(file.createNewFile());
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.setMinPlanIndex(i);
+ tsFileResource.setMaxPlanIndex(i);
+ tsFileResource.setVersion(i);
+ tsFileResource.serialize();
+ sourceUnseqFiles.add(tsFileResource);
+ tmpSourceUnseqFiles.add(new TsFileResource(file));
+ // create mods file
+ Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"),
1, 0, 100);
+ tsFileResource.getModFile().write(deletion);
+ deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 1, 200,
300);
+ tsFileResource.getModFile().write(deletion);
+ tsFileResource.getModFile().close();
+
+ prepareFile(tsFileResource, i * 10, 5, i * 10);
+ }
+
+ // create .merge files
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"
+ + MergeTask.MERGE_SUFFIX));
+ Assert.assertTrue(file.createNewFile());
+ TsFileResource tsFileResource = new TsFileResource(file);
+ prepareFile(tsFileResource, i * 10, 10, i * 10);
+ }
+
+ // create merging mods file
+ Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s1"), 1,
0, 100);
+ mergingModsFile.write(deletion);
+ deletion = new Deletion(new PartialPath("root.sg1.d1", "s1"), 1, 200,
30000);
+ mergingModsFile.write(deletion);
+ mergingModsFile.close();
+
+ // create log
+ MergeLogger mergeLogger = new MergeLogger(TestConstant.SEQUENCE_DATA_DIR);
+ MergeResource mergeResource = new MergeResource(sourceSeqFiles,
sourceUnseqFiles);
+ mergeLogger.logFiles(mergeResource);
+ mergeLogger.close();
+ }
+
+ private void deleteFiles() {
+ for (TsFileResource seqResource : sourceSeqFiles) {
+ seqResource.remove();
+ File mergeFile = new File(seqResource.getTsFilePath() +
MergeTask.MERGE_SUFFIX);
+ if (mergeFile.exists()) {
+ mergeFile.delete();
+ }
+ }
+ for (TsFileResource seqResource : sourceSeqFiles) {
+ seqResource.remove();
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 18970d7..4633178 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -100,7 +100,7 @@ abstract class MergeTest {
MergeManager.getINSTANCE().stop();
}
- private void prepareSeries() throws MetadataException {
+ protected void prepareSeries() throws MetadataException {
measurementSchemas = new MeasurementSchema[measurementNum];
for (int i = 0; i < measurementNum; i++) {
measurementSchemas[i] =