This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new acca766  [To rel/0.12][IOTDB-2603]Fix compaction recover (#5113)
acca766 is described below

commit acca76652f2d841fad68ec8e6fd427c94c2f5b3a
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Feb 25 09:37:32 2022 +0800

    [To rel/0.12][IOTDB-2603]Fix compaction recover (#5113)
    
    * add handle when some source files lost
    
    * fix recover
    
    * remove useless code in merge recover
    
    * spotless
    
    * serialize target file in recover
    
    * update memory when some source files lost
---
 .../resources/conf/iotdb-engine.properties         |   5 -
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   4 -
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 -
 .../db/engine/compaction/TsFileManagement.java     |  19 +-
 .../db/engine/merge/recover/MergeLogAnalyzer.java  |  23 +-
 .../merge/task/CompactionMergeRecoverTask.java     |  12 +-
 .../db/engine/merge/task/RecoverMergeTask.java     |  86 +++-
 .../engine/storagegroup/StorageGroupProcessor.java |   2 -
 .../iotdb/db/engine/merge/MergeRecoverTest.java    | 436 +++++++++++++++++++++
 .../apache/iotdb/db/engine/merge/MergeTest.java    |   2 +-
 10 files changed, 546 insertions(+), 47 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index edc183f..fef0dec 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -451,11 +451,6 @@ timestamp_precision=ms
 # If you are feeling the rebooting is too slow, set this to false, false by 
default
 # continue_merge_after_reboot=false
 
-# When set to true, all unseq merges becomes full merge (the whole SeqFiles 
are re-written despite how
-# much they are overflowed). This may increase merge overhead depending on how 
much the SeqFiles
-# are overflowed.
-# force_full_merge=true
-
 # How many threads will be set up to perform compaction, 10 by default.
 # Set to 1 when less than or equal to 0.
 # compaction_thread_num=10
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d25ac44..cc46dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1463,10 +1463,6 @@ public class IoTDBConfig {
     return forceFullMerge;
   }
 
-  void setForceFullMerge(boolean forceFullMerge) {
-    this.forceFullMerge = forceFullMerge;
-  }
-
   public int getCompactionThreadNum() {
     return compactionThreadNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index fe4d760..0ff011f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -511,10 +511,6 @@ public class IoTDBDescriptor {
           Long.parseLong(
               properties.getProperty(
                   "compaction_interval", 
Long.toString(conf.getCompactionInterval()))));
-      conf.setForceFullMerge(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "force_full_merge", 
Boolean.toString(conf.isForceFullMerge()))));
       conf.setCompactionThreadNum(
           Integer.parseInt(
               properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index ccfb35d..3e0f2c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -82,8 +82,6 @@ public abstract class TsFileManagement {
   protected boolean isMergeExecutedInCurrentTask = false;
 
   protected boolean isForceFullMerge = 
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
-  private final int maxOpenFileNumInEachUnseqCompaction =
-      
IoTDBDescriptor.getInstance().getConfig().getMaxSelectUnseqFileNumInEachUnseqCompaction();
 
   protected ReentrantLock compactionSelectionLock = new ReentrantLock();
 
@@ -336,6 +334,19 @@ public abstract class TsFileManagement {
     seqFile.writeUnlock();
   }
 
+  public void replace(
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unseqResources,
+      List<TsFileResource> targetResources,
+      boolean isTargetSeq)
+      throws IOException {
+    writeLock();
+    removeAll(seqResources, true);
+    removeAll(unseqResources, false);
+    addAll(targetResources, isTargetSeq);
+    writeUnlock();
+  }
+
   private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
     writeLock();
     try {
@@ -360,7 +371,7 @@ public abstract class TsFileManagement {
   }
 
   @SuppressWarnings("squid:S1141")
-  private void updateMergeModification(TsFileResource seqFile) {
+  public void updateMergeModification(TsFileResource seqFile) {
     try {
       // remove old modifications and write modifications generated during 
merge
       seqFile.removeModFile();
@@ -387,7 +398,7 @@ public abstract class TsFileManagement {
     }
   }
 
-  private void removeMergingModification() {
+  public void removeMergingModification() {
     try {
       if (mergingModification != null) {
         mergingModification.remove();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
index 780eead..1203964 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
@@ -75,8 +75,8 @@ public class MergeLogAnalyzer {
     try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(logFile))) {
       currLine = bufferedReader.readLine();
       if (currLine != null) {
+        status = Status.All_SOURCE_FILES_EXIST;
         analyzeSeqFiles(bufferedReader);
-
         analyzeUnseqFiles(bufferedReader);
       }
     }
@@ -103,7 +103,9 @@ public class MergeLogAnalyzer {
           mergeSeqFiles.add(seqFile);
           // remove to speed-up next iteration
           iterator.remove();
-          currentFileFound = true;
+          if (seqFile.getTsFile().exists()) {
+            currentFileFound = true;
+          }
           break;
         }
       }
@@ -119,7 +121,7 @@ public class MergeLogAnalyzer {
           (System.currentTimeMillis() - startTime));
     }
     if (!allSourceFileExists) {
-      status = Status.MERGE_END;
+      status = Status.SOME_SOURCE_FILES_LOST;
     }
     resource.setSeqFiles(mergeSeqFiles);
   }
@@ -128,7 +130,6 @@ public class MergeLogAnalyzer {
     if (!STR_UNSEQ_FILES.equals(currLine)) {
       return;
     }
-    status = Status.MERGE_START;
     long startTime = System.currentTimeMillis();
     List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
     boolean allSourceFileExists = true;
@@ -142,7 +143,9 @@ public class MergeLogAnalyzer {
           mergeUnseqFiles.add(unseqFile);
           // remove to speed-up next iteration
           iterator.remove();
-          currentFileFound = true;
+          if (unseqFile.getTsFile().exists()) {
+            currentFileFound = true;
+          }
           break;
         }
       }
@@ -158,7 +161,7 @@ public class MergeLogAnalyzer {
           (System.currentTimeMillis() - startTime));
     }
     if (!allSourceFileExists) {
-      status = Status.MERGE_END;
+      status = Status.SOME_SOURCE_FILES_LOST;
     }
     resource.setUnseqFiles(mergeUnseqFiles);
   }
@@ -166,9 +169,9 @@ public class MergeLogAnalyzer {
   public enum Status {
     // almost nothing has been done
     NONE,
-    // at least the files and timeseries to be merged are known
-    MERGE_START,
-    // all the merge files are merged with the origin files and the task is 
almost done
-    MERGE_END
+    // all source files exist
+    All_SOURCE_FILES_EXIST,
+    // some source files lost
+    SOME_SOURCE_FILES_LOST
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
index 27e038f..5771a7a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -21,14 +21,12 @@ package org.apache.iotdb.db.engine.merge.task;
 
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
 
 public class CompactionMergeRecoverTask implements Runnable {
 
@@ -41,8 +39,6 @@ public class CompactionMergeRecoverTask implements Runnable {
 
   public CompactionMergeRecoverTask(
       TsFileManagement tsFileManagement,
-      List<TsFileResource> seqFiles,
-      List<TsFileResource> unseqFiles,
       String storageGroupSysDir,
       MergeCallback callback,
       String taskName,
@@ -54,13 +50,7 @@ public class CompactionMergeRecoverTask implements Runnable {
     this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     this.recoverMergeTask =
         new RecoverMergeTask(
-            seqFiles,
-            unseqFiles,
-            storageGroupSysDir,
-            callback,
-            taskName,
-            fullMerge,
-            storageGroupName);
+            tsFileManagement, storageGroupSysDir, callback, taskName, 
fullMerge, storageGroupName);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index a23ee56..5965cb3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -19,19 +19,26 @@
 
 package org.apache.iotdb.db.engine.merge.task;
 
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer.Status;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;
+
 /**
  * RecoverMergeTask is an extension of MergeTask, which resumes the last merge 
progress by scanning
  * merge.log using LogAnalyzer and continue the unfinished merge.
@@ -41,17 +48,24 @@ public class RecoverMergeTask extends MergeTask {
   private static final Logger logger = 
LoggerFactory.getLogger(RecoverMergeTask.class);
 
   private MergeLogAnalyzer analyzer;
+  private TsFileManagement tsFileManagement;
 
   public RecoverMergeTask(
-      List<TsFileResource> seqFiles,
-      List<TsFileResource> unseqFiles,
+      TsFileManagement tsfileManagement,
       String storageGroupSysDir,
       MergeCallback callback,
       String taskName,
       boolean fullMerge,
       String storageGroupName) {
     super(
-        seqFiles, unseqFiles, storageGroupSysDir, callback, taskName, 
fullMerge, storageGroupName);
+        tsfileManagement.getTsFileList(true),
+        tsfileManagement.getTsFileList(false),
+        storageGroupSysDir,
+        callback,
+        taskName,
+        fullMerge,
+        storageGroupName);
+    this.tsFileManagement = tsfileManagement;
   }
 
   public void recoverMerge() throws IOException, MetadataException {
@@ -68,19 +82,79 @@ public class RecoverMergeTask extends MergeTask {
       case NONE:
         logFile.delete();
         break;
-      case MERGE_START:
-        resumeAfterFilesLogged();
+      case All_SOURCE_FILES_EXIST:
+        handleWhenAllSourceFilesExist();
+        break;
+      case SOME_SOURCE_FILES_LOST:
+        handleWhenSomeSourceFilesLost();
         break;
       default:
         throw new UnsupportedOperationException(taskName + " found 
unrecognized status " + status);
     }
+    if (logFile.exists()) {
+      logFile.delete();
+    }
     if (logger.isInfoEnabled()) {
       logger.info(
           "{} merge recovery ends after {}ms", taskName, 
(System.currentTimeMillis() - startTime));
     }
   }
 
-  private void resumeAfterFilesLogged() throws IOException {
+  /** Delete .merge file and merging mods file. */
+  private void handleWhenAllSourceFilesExist() throws IOException {
     cleanUp(false);
+    tsFileManagement.removeMergingModification();
+  }
+
+  /**
+   * 1. If target file does not exist, then move .merge file to target file 
and serialize target
+   * resource file. <br>
+   * 2. Append merging modification to target mods file and delete merging 
mods file. <br>
+   * 3. Delete source files and .merge file. <br>
+   * 4. Update resource memory of tsfileManagement. <br>
+   */
+  private void handleWhenSomeSourceFilesLost() throws IOException {
+    List<TsFileResource> targetResouces = new ArrayList<>();
+    for (TsFileResource sourceSeqResource : resource.getSeqFiles()) {
+      File targetFile = 
modifyTsFileNameUnseqMergCnt(sourceSeqResource.getTsFile());
+      TsFileResource targetTsFileResouce = new TsFileResource(targetFile);
+      // move to target file and serialize resource file
+      if (!targetFile.exists()) {
+        // move target file
+        File tmpTargetFile = new File(sourceSeqResource.getTsFilePath() + 
MERGE_SUFFIX);
+        FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, targetFile);
+
+        // serialize target resource file
+        try (TsFileSequenceReader reader = new 
TsFileSequenceReader(targetFile.getAbsolutePath())) {
+          FileLoaderUtils.updateTsFileResource(reader, targetTsFileResouce);
+        }
+        targetTsFileResouce.serialize();
+        targetResouces.add(targetTsFileResouce);
+      }
+
+      // write merging modifications to new mods file
+      tsFileManagement.updateMergeModification(targetTsFileResouce);
+
+      // delete source seq file
+      sourceSeqResource.remove();
+
+      // delete merge file
+      File mergeFile = new File(sourceSeqResource.getTsFilePath() + 
MERGE_SUFFIX);
+      if (mergeFile.exists()) {
+        mergeFile.delete();
+      }
+    }
+
+    // update memory
+    tsFileManagement.replace(
+        resource.getSeqFiles(), resource.getUnseqFiles(), targetResouces, 
true);
+
+    // delete unseq source files
+    for (TsFileResource unseqResource : resource.getUnseqFiles()) {
+      unseqResource.remove();
+    }
+
+    // delete merging mods file
+    tsFileManagement.removeMergingModification();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 0df5743..971592c 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -517,8 +517,6 @@ public class StorageGroupProcessor {
       CompactionMergeRecoverTask recoverTask =
           new CompactionMergeRecoverTask(
               tsFileManagement,
-              new ArrayList<>(tsFileManagement.getTsFileList(true)),
-              tsFileManagement.getTsFileList(false),
               storageGroupSysDir.getPath(),
               tsFileManagement::mergeEndAction,
               taskName,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java
new file mode 100644
index 0000000..21fac51
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeRecoverTest.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.merge;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;
+
+public class MergeRecoverTest extends MergeTest {
+  private List<TsFileResource> sourceSeqFiles = new ArrayList<>();
+  private List<TsFileResource> sourceUnseqFiles = new ArrayList<>();
+  private List<TsFileResource> tmpSourceSeqFiles = new ArrayList<>();
+  private List<TsFileResource> tmpSourceUnseqFiles = new ArrayList<>();
+  private File logFile = new File(TestConstant.SEQUENCE_DATA_DIR, 
MergeLogger.MERGE_LOG_NAME);
+  private final int seqFileNum = 10;
+  private final int unseqFileNum = 5;
+  private final File seqDataDir = new File(TestConstant.SEQUENCE_DATA_DIR);
+  private final File unseqDataDir = new File(TestConstant.UNSEQUENCE_DATA_DIR);
+  private final ModificationFile mergingModsFile =
+      new ModificationFile(
+          TestConstant.SEQUENCE_DATA_DIR + File.separator + 
MERGING_MODIFICATION_FILE_NAME);
+  private TsFileManagement tsFileManagement =
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .getCompactionStrategy()
+          .getTsFileManagement("root.sg1", "0", 
TestConstant.SEQUENCE_DATA_DIR);
+
+  @Before
+  public void setUp() throws IOException, MetadataException, 
WriteProcessException {
+    IoTDB.metaManager.init();
+    Assert.assertTrue(seqDataDir.mkdirs());
+    Assert.assertTrue(unseqDataDir.mkdirs());
+    prepareSeries();
+    createFiles();
+    tsFileManagement.addAll(sourceSeqFiles, true);
+    tsFileManagement.addAll(sourceUnseqFiles, false);
+    tsFileManagement.mergingModification = mergingModsFile;
+    MergeManager.getINSTANCE().start();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    IoTDB.metaManager.clear();
+    EnvironmentUtils.cleanAllDir();
+    MergeManager.getINSTANCE().stop();
+    deleteFiles();
+  }
+
+  /**
+   * source seq file index: 1~10<br>
+   * source unseq file index: 11~15<br>
+   * deleted file: 1.tsfile <br>
+   * no target file exist
+   */
+  @Test
+  public void testRecoverWithSomeSourceFilesLost() throws IOException, 
MetadataException {
+    sourceSeqFiles.get(0).getTsFile().delete();
+    RecoverMergeTask recoverMergeTask =
+        new RecoverMergeTask(
+            tsFileManagement,
+            TestConstant.SEQUENCE_DATA_DIR,
+            tsFileManagement::mergeEndAction,
+            "recoverTest",
+            true,
+            "root.sg1");
+    recoverMergeTask.recoverMerge();
+    for (TsFileResource seqResource : tmpSourceSeqFiles) {
+      Assert.assertFalse(seqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(seqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX).exists());
+      Assert.assertFalse(seqResource.getModFile().exists());
+      File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+      Assert.assertTrue(targetFile.exists());
+      Assert.assertTrue(new File(targetFile.getPath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      ModificationFile targetModsFile =
+          new ModificationFile(targetFile.getPath() + 
ModificationFile.FILE_SUFFIX);
+      Assert.assertTrue(targetModsFile.exists());
+      Assert.assertEquals(2, targetModsFile.getModifications().size());
+    }
+    for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+      Assert.assertFalse(unseqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(unseqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(unseqResource.getModFile().exists());
+    }
+    Assert.assertFalse(mergingModsFile.exists());
+    Assert.assertFalse(logFile.exists());
+  }
+
+  /**
+   * source seq file index: 1~10 <br>
+   * source unseq file index: 11~15 <br>
+   * deleted file: 1.tsfile, 1.tsfile.resource, 2.tsfile <br>
+   * existed target file: 1.tsfile, 1.tsfile.resource, 2.tsfile
+   */
+  @Test
+  public void testRecoverWithSomeSourceFilesLostAndSomeTargetFilesExist()
+      throws IOException, MetadataException {
+    File targetFile = 
modifyTsFileNameUnseqMergCnt(sourceSeqFiles.get(0).getTsFile());
+    
FSFactoryProducer.getFSFactory().moveFile(sourceSeqFiles.get(0).getTsFile(), 
targetFile);
+    FSFactoryProducer.getFSFactory()
+        .moveFile(
+            new File(sourceSeqFiles.get(0).getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX),
+            new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+    targetFile = 
modifyTsFileNameUnseqMergCnt(sourceSeqFiles.get(1).getTsFile());
+    
FSFactoryProducer.getFSFactory().moveFile(sourceSeqFiles.get(1).getTsFile(), 
targetFile);
+    // sg recover will serialize resouce file
+    TsFileResource targetTsFileResouce = new TsFileResource(targetFile);
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(targetFile.getAbsolutePath())) {
+      FileLoaderUtils.updateTsFileResource(reader, targetTsFileResouce);
+    }
+    targetTsFileResouce.serialize();
+
+    RecoverMergeTask recoverMergeTask =
+        new RecoverMergeTask(
+            tsFileManagement,
+            TestConstant.SEQUENCE_DATA_DIR,
+            tsFileManagement::mergeEndAction,
+            "recoverTest",
+            true,
+            "root.sg1");
+    recoverMergeTask.recoverMerge();
+    for (TsFileResource seqResource : tmpSourceSeqFiles) {
+      Assert.assertFalse(seqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(seqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX).exists());
+      targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+      Assert.assertTrue(targetFile.exists());
+      Assert.assertTrue(new File(targetFile.getPath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(seqResource.getModFile().exists());
+      ModificationFile targetModsFile =
+          new ModificationFile(targetFile.getPath() + 
ModificationFile.FILE_SUFFIX);
+      Assert.assertTrue(targetModsFile.exists());
+      Assert.assertEquals(2, targetModsFile.getModifications().size());
+    }
+    for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+      Assert.assertFalse(unseqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(unseqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(unseqResource.getModFile().exists());
+    }
+    Assert.assertFalse(mergingModsFile.exists());
+    Assert.assertFalse(logFile.exists());
+  }
+
+  @Test
+  public void testRecoverWithAllSeqSourceFilesLost() throws IOException, 
MetadataException {
+    for (TsFileResource resource : sourceSeqFiles) {
+      File targetFile = modifyTsFileNameUnseqMergCnt(resource.getTsFile());
+      FSFactoryProducer.getFSFactory()
+          .moveFile(new File(resource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX), targetFile);
+      FSFactoryProducer.getFSFactory()
+          .moveFile(
+              new File(resource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX),
+              new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+      resource.remove();
+    }
+    RecoverMergeTask recoverMergeTask =
+        new RecoverMergeTask(
+            tsFileManagement,
+            TestConstant.SEQUENCE_DATA_DIR,
+            tsFileManagement::mergeEndAction,
+            "recoverTest",
+            true,
+            "root.sg1");
+    recoverMergeTask.recoverMerge();
+    for (TsFileResource seqResource : tmpSourceSeqFiles) {
+      Assert.assertFalse(seqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(seqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX).exists());
+      File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+      Assert.assertTrue(targetFile.exists());
+      Assert.assertTrue(new File(targetFile.getPath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(seqResource.getModFile().exists());
+      ModificationFile targetModsFile =
+          new ModificationFile(targetFile.getPath() + 
ModificationFile.FILE_SUFFIX);
+      Assert.assertTrue(targetModsFile.exists());
+      Assert.assertEquals(2, targetModsFile.getModifications().size());
+    }
+    for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+      Assert.assertFalse(unseqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(unseqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(unseqResource.getModFile().exists());
+    }
+    Assert.assertFalse(mergingModsFile.exists());
+    Assert.assertFalse(logFile.exists());
+  }
+
+  @Test
+  public void testRecoverWithAllSourceFilesLost() throws IOException, 
MetadataException {
+    for (TsFileResource resource : sourceSeqFiles) {
+      File targetFile = modifyTsFileNameUnseqMergCnt(resource.getTsFile());
+      FSFactoryProducer.getFSFactory()
+          .moveFile(new File(resource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX), targetFile);
+      FSFactoryProducer.getFSFactory()
+          .moveFile(
+              new File(resource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX),
+              new File(targetFile.getPath() + TsFileResource.RESOURCE_SUFFIX));
+      resource.remove();
+    }
+    for (TsFileResource resource : sourceUnseqFiles) {
+      resource.remove();
+    }
+    RecoverMergeTask recoverMergeTask =
+        new RecoverMergeTask(
+            tsFileManagement,
+            TestConstant.SEQUENCE_DATA_DIR,
+            tsFileManagement::mergeEndAction,
+            "recoverTest",
+            true,
+            "root.sg1");
+    recoverMergeTask.recoverMerge();
+    for (TsFileResource seqResource : tmpSourceSeqFiles) {
+      Assert.assertFalse(seqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(seqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX).exists());
+      File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+      Assert.assertTrue(targetFile.exists());
+      Assert.assertTrue(new File(targetFile.getPath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(seqResource.getModFile().exists());
+      ModificationFile targetModsFile =
+          new ModificationFile(targetFile.getPath() + 
ModificationFile.FILE_SUFFIX);
+      Assert.assertTrue(targetModsFile.exists());
+      Assert.assertEquals(2, targetModsFile.getModifications().size());
+    }
+    for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+      Assert.assertFalse(unseqResource.getTsFile().exists());
+      Assert.assertFalse(
+          new File(unseqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(unseqResource.getModFile().exists());
+    }
+    Assert.assertFalse(mergingModsFile.exists());
+    Assert.assertFalse(logFile.exists());
+  }
+
+  @Test
+  public void testRecoverWithAllSourceFilesExist() throws IOException, 
MetadataException {
+    RecoverMergeTask recoverMergeTask =
+        new RecoverMergeTask(
+            tsFileManagement,
+            TestConstant.SEQUENCE_DATA_DIR,
+            tsFileManagement::mergeEndAction,
+            "recoverTest",
+            true,
+            "root.sg1");
+    recoverMergeTask.recoverMerge();
+    for (TsFileResource seqResource : tmpSourceSeqFiles) {
+      Assert.assertTrue(seqResource.getTsFile().exists());
+      Assert.assertTrue(
+          new File(seqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertFalse(new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX).exists());
+      Assert.assertTrue(seqResource.getModFile().exists());
+      File targetFile = modifyTsFileNameUnseqMergCnt(seqResource.getTsFile());
+      Assert.assertFalse(targetFile.exists());
+      Assert.assertFalse(new File(targetFile.getPath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      ModificationFile targetModsFile =
+          new ModificationFile(targetFile.getPath() + 
ModificationFile.FILE_SUFFIX);
+      Assert.assertFalse(targetModsFile.exists());
+    }
+    for (TsFileResource unseqResource : tmpSourceUnseqFiles) {
+      Assert.assertTrue(unseqResource.getTsFile().exists());
+      Assert.assertTrue(
+          new File(unseqResource.getTsFilePath() + 
TsFileResource.RESOURCE_SUFFIX).exists());
+      Assert.assertTrue(unseqResource.getModFile().exists());
+    }
+    Assert.assertFalse(mergingModsFile.exists());
+    Assert.assertFalse(logFile.exists());
+  }
+
+  private void createFiles() throws IOException, IllegalPathException, 
WriteProcessException {
+    // create source seq files
+    for (int i = 0; i < seqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.SEQUENCE_DATA_DIR.concat(
+                  i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"));
+      Assert.assertTrue(file.createNewFile());
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setClosed(true);
+      tsFileResource.setMinPlanIndex(i);
+      tsFileResource.setMaxPlanIndex(i);
+      tsFileResource.setVersion(i);
+      tsFileResource.serialize();
+      sourceSeqFiles.add(tsFileResource);
+      tmpSourceSeqFiles.add(new TsFileResource(file));
+      // create mods file
+      Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 
1, 0, 100);
+      tsFileResource.getModFile().write(deletion);
+      deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 1, 200, 
300);
+      tsFileResource.getModFile().write(deletion);
+      tsFileResource.getModFile().close();
+
+      prepareFile(tsFileResource, i * 10, 10, i * 10);
+    }
+
+    // create source unseq files
+    for (int i = seqFileNum; i < seqFileNum + unseqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.UNSEQUENCE_DATA_DIR.concat(
+                  i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"));
+      Assert.assertTrue(file.createNewFile());
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setClosed(true);
+      tsFileResource.setMinPlanIndex(i);
+      tsFileResource.setMaxPlanIndex(i);
+      tsFileResource.setVersion(i);
+      tsFileResource.serialize();
+      sourceUnseqFiles.add(tsFileResource);
+      tmpSourceUnseqFiles.add(new TsFileResource(file));
+      // create mods file
+      Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 
1, 0, 100);
+      tsFileResource.getModFile().write(deletion);
+      deletion = new Deletion(new PartialPath("root.sg1.d1", "s0"), 1, 200, 
300);
+      tsFileResource.getModFile().write(deletion);
+      tsFileResource.getModFile().close();
+
+      prepareFile(tsFileResource, i * 10, 5, i * 10);
+    }
+
+    // create .merge files
+    for (int i = 0; i < seqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.SEQUENCE_DATA_DIR.concat(
+                  i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"
+                      + MergeTask.MERGE_SUFFIX));
+      Assert.assertTrue(file.createNewFile());
+      TsFileResource tsFileResource = new TsFileResource(file);
+      prepareFile(tsFileResource, i * 10, 10, i * 10);
+    }
+
+    // create merging mods file
+    Deletion deletion = new Deletion(new PartialPath("root.sg1.d1", "s1"), 1, 
0, 100);
+    mergingModsFile.write(deletion);
+    deletion = new Deletion(new PartialPath("root.sg1.d1", "s1"), 1, 200, 
30000);
+    mergingModsFile.write(deletion);
+    mergingModsFile.close();
+
+    // create log
+    MergeLogger mergeLogger = new MergeLogger(TestConstant.SEQUENCE_DATA_DIR);
+    MergeResource mergeResource = new MergeResource(sourceSeqFiles, 
sourceUnseqFiles);
+    mergeLogger.logFiles(mergeResource);
+    mergeLogger.close();
+  }
+
+  private void deleteFiles() {
+    for (TsFileResource seqResource : sourceSeqFiles) {
+      seqResource.remove();
+      File mergeFile = new File(seqResource.getTsFilePath() + 
MergeTask.MERGE_SUFFIX);
+      if (mergeFile.exists()) {
+        mergeFile.delete();
+      }
+    }
+    for (TsFileResource seqResource : sourceSeqFiles) {
+      seqResource.remove();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 18970d7..4633178 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -100,7 +100,7 @@ abstract class MergeTest {
     MergeManager.getINSTANCE().stop();
   }
 
-  private void prepareSeries() throws MetadataException {
+  protected void prepareSeries() throws MetadataException {
     measurementSchemas = new MeasurementSchema[measurementNum];
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas[i] =

Reply via email to