This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new e34d129  [To rel/0.11] Fix compaction recover list bug to 0.11 (#2452)
e34d129 is described below

commit e34d1297e6515895a0e0b2ab52443b0cb5473873
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri Jan 8 21:56:33 2021 +0800

    [To rel/0.11] Fix compaction recover list bug to 0.11 (#2452)
    
    Fix compaction recover list bug
---
 .../db/engine/compaction/TsFileManagement.java     |  5 ++
 .../level/LevelCompactionTsFileManagement.java     | 57 ++++++++++++++++++++--
 .../no/NoCompactionTsFileManagement.java           |  5 ++
 .../engine/storagegroup/StorageGroupProcessor.java | 10 ++--
 .../compaction/LevelCompactionRecoverTest.java     | 10 ++--
 5 files changed, 74 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 096abec..0c728c7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -99,6 +99,11 @@ public abstract class TsFileManagement {
   public abstract void add(TsFileResource tsFileResource, boolean sequence);
 
   /**
+   * add one TsFile to list for recover
+   */
+  public abstract void addRecover(TsFileResource tsFileResource, boolean 
sequence);
+
+  /**
    * add some TsFiles to list
    */
   public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5c4a168..66d7d71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -81,6 +81,8 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   private final Map<Long, List<List<TsFileResource>>> 
unSequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
+  private final List<TsFileResource> sequenceRecoverTsFileResources = new 
CopyOnWriteArrayList<>();
+  private final List<TsFileResource> unSequenceRecoverTsFileResources = new 
CopyOnWriteArrayList<>();
 
   public LevelCompactionTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -240,6 +242,19 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   }
 
   @Override
+  public void addRecover(TsFileResource tsFileResource, boolean sequence) {
+    if (sequence) {
+      synchronized (sequenceRecoverTsFileResources) {
+        sequenceRecoverTsFileResources.add(tsFileResource);
+      }
+    } else {
+      synchronized (unSequenceTsFileResources) {
+        unSequenceRecoverTsFileResources.add(tsFileResource);
+      }
+    }
+  }
+
+  @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
     for (TsFileResource tsFileResource : tsFileResourceList) {
       add(tsFileResource, sequence);
@@ -349,7 +364,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
         }
         if (fullMerge) {
           // get tsfile resource from list, as they have been recovered in 
StorageGroupProcessor
-          TsFileResource targetTsFileResource = getTsFileResource(targetFile, 
isSeq);
+          TsFileResource targetTsFileResource = 
getRecoverTsFileResource(targetFile, isSeq);
           long timePartition = targetTsFileResource.getTimePartition();
           RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(target);
           // if not complete compaction, resume merge
@@ -371,7 +386,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           deleteAllSubLevelFiles(isSeq, timePartition);
         } else {
           // get tsfile resource from list, as they have been recovered in 
StorageGroupProcessor
-          TsFileResource targetResource = getTsFileResource(targetFile, isSeq);
+          TsFileResource targetResource = getRecoverTsFileResource(targetFile, 
isSeq);
           long timePartition = targetResource.getTimePartition();
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
@@ -392,13 +407,26 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
                 .merge(targetResource, sourceTsFileResources, storageGroupName,
                     compactionLogger, deviceSet,
                     isSeq);
+            // complete compaction and delete source file
+            writeLock();
+            try {
+              int targetLevel = getMergeLevel(targetResource.getTsFile());
+              if (isSeq) {
+                
sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                sequenceRecoverTsFileResources.clear();
+              } else {
+                
unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+                unSequenceRecoverTsFileResources.clear();
+              }
+              deleteLevelFilesInList(timePartition, sourceTsFileResources, 
level, isSeq);
+            } finally {
+              writeUnlock();
+            }
+            deleteLevelFilesInDisk(sourceTsFileResources);
             compactionLogger.close();
           } else {
             writer.close();
           }
-          // complete compaction and delete source file
-          deleteLevelFilesInDisk(sourceTsFileResources);
-          deleteLevelFilesInList(timePartition, sourceTsFileResources, level, 
isSeq);
         }
       }
     } catch (IOException | IllegalPathException e) {
@@ -611,6 +639,25 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     return Integer.parseInt(mergeLevelStr);
   }
 
+  private TsFileResource getRecoverTsFileResource(String filePath, boolean 
isSeq)
+      throws IOException {
+    if (isSeq) {
+      for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new 
File(filePath).toPath())) {
+          return tsFileResource;
+        }
+      }
+    } else {
+      for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+        if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new 
File(filePath).toPath())) {
+          return tsFileResource;
+        }
+      }
+    }
+    logger.error("cannot get tsfile resource path: {}", filePath);
+    throw new IOException();
+  }
+
   private TsFileResource getTsFileResource(String filePath, boolean isSeq) 
throws IOException {
     if (isSeq) {
       for (List<SortedSet<TsFileResource>> tsFileResourcesWithLevel : 
sequenceTsFileResources
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 1e130a0..7d3638d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -92,6 +92,11 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
   }
 
   @Override
+  public void addRecover(TsFileResource tsFileResource, boolean sequence) {
+    logger.info("{} do not need to recover", storageGroupName);
+  }
+
+  @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
     if (sequence) {
       sequenceFileTreeSet.addAll(tsFileResourceList);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b558f2c..201b908 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -588,9 +588,13 @@ public class StorageGroupProcessor {
       try {
         // this tsfile is not zero level, no need to perform redo wal
         if 
(LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) 
{
-          recoverPerformer.recover(false);
-          tsFileResource.setClosed(true);
-          tsFileManagement.add(tsFileResource, isSeq);
+          writer = recoverPerformer.recover(false);
+          if (writer.hasCrashed()) {
+            tsFileManagement.addRecover(tsFileResource, isSeq);
+          } else {
+            tsFileResource.setClosed(true);
+            tsFileManagement.add(tsFileResource, isSeq);
+          }
           continue;
         } else {
           writer = recoverPerformer.recover(true);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 4ee7880..b401c03 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -113,7 +113,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
         COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path = new PartialPath(
@@ -191,7 +191,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
     }
     logStream.close();
 
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path = new PartialPath(
@@ -274,7 +274,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
     out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
     out.close();
 
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path = new PartialPath(
@@ -333,7 +333,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
         COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), false);
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, false);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path = new PartialPath(
@@ -484,7 +484,7 @@ public class LevelCompactionRecoverTest extends 
LevelCompactionTest {
     compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
     CompactionUtils.merge(targetTsFileResource, new 
ArrayList<>(seqResources.subList(0, 3)),
         COMPACTION_TEST_SG, compactionLogger, new HashSet<>(), true);
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     compactionLogger.close();
     levelCompactionTsFileManagement.recover();
     QueryContext context = new QueryContext();

Reply via email to