This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c38f6d  Fix unseq compaction file selector conflicts with time 
partition bug (#2920)
8c38f6d is described below

commit 8c38f6d859f96bafaf8b4935a22fa55225002629
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Wed Mar 31 13:52:05 2021 +0800

    Fix unseq compaction file selector conflicts with time partition bug (#2920)
    
    Co-authored-by: zhanglingzhe <[email protected]>
---
 .../db/engine/compaction/TsFileManagement.java     |  10 +-
 .../level/LevelCompactionTsFileManagement.java     |  44 +++--
 .../no/NoCompactionTsFileManagement.java           | 180 ++++++++++++++++-----
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   1 -
 .../engine/merge/MaxFileMergeFileSelectorTest.java |  37 +++++
 5 files changed, 221 insertions(+), 51 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 6225735..bc734b2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -82,9 +82,17 @@ public abstract class TsFileManagement {
     isForceFullMerge = forceFullMerge;
   }
 
-  /** get the TsFile list in sequence */
+  /**
+   * get the TsFile list in sequence, not recommend to use this method, use
+   * getTsFileListByTimePartition instead
+   */
+  @Deprecated
   public abstract List<TsFileResource> getTsFileList(boolean sequence);
 
+  /** get the TsFile list in sequence by time partition */
+  public abstract List<TsFileResource> getTsFileListByTimePartition(
+      boolean sequence, long timePartition);
+
   /** get the TsFile list iterator in sequence */
   public abstract Iterator<TsFileResource> getIterator(boolean sequence);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9284dfc..86f8148 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -167,24 +167,42 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     }
   }
 
+  @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
     List<TsFileResource> result = new ArrayList<>();
     if (sequence) {
       synchronized (sequenceTsFileResources) {
-        for (List<SortedSet<TsFileResource>> sequenceTsFileList :
-            sequenceTsFileResources.values()) {
-          for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
-            result.addAll(sequenceTsFileList.get(i));
-          }
+        for (long timePartition : sequenceTsFileResources.keySet()) {
+          result.addAll(getTsFileListByTimePartition(true, timePartition));
         }
       }
     } else {
       synchronized (unSequenceTsFileResources) {
-        for (List<List<TsFileResource>> unSequenceTsFileList : 
unSequenceTsFileResources.values()) {
-          for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
-            result.addAll(unSequenceTsFileList.get(i));
-          }
+        for (long timePartition : unSequenceTsFileResources.keySet()) {
+          result.addAll(getTsFileListByTimePartition(false, timePartition));
+        }
+      }
+    }
+    return result;
+  }
+
+  public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, 
long timePartition) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      synchronized (sequenceTsFileResources) {
+        List<SortedSet<TsFileResource>> sequenceTsFileList =
+            sequenceTsFileResources.get(timePartition);
+        for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
+          result.addAll(sequenceTsFileList.get(i));
+        }
+      }
+    } else {
+      synchronized (unSequenceTsFileResources) {
+        List<List<TsFileResource>> unSequenceTsFileList =
+            unSequenceTsFileResources.get(timePartition);
+        for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
+          result.addAll(unSequenceTsFileList.get(i));
         }
       }
     }
@@ -557,7 +575,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     if (enableUnseqCompaction && unseqLevelNum <= 1 && 
forkedUnSequenceTsFileResources.size() > 0) {
       merge(
           isForceFullMerge,
-          getTsFileList(true),
+          getTsFileListByTimePartition(true, timePartition),
           forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     } else {
@@ -599,7 +617,11 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
             // do not merge current unseq file level to upper level and just 
merge all of them to
             // seq file
             isSeqMerging = false;
-            merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
+            merge(
+                isForceFullMerge,
+                getTsFileListByTimePartition(true, timePartition),
+                mergeResources.get(i),
+                Long.MAX_VALUE);
           } else {
             compactionLogger = new CompactionLogger(storageGroupDir, 
storageGroupName);
             // log source file list and target file for recover
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 671955a..6d9864d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -26,43 +26,58 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 public class NoCompactionTsFileManagement extends TsFileManagement {
 
   private static final Logger logger = 
LoggerFactory.getLogger(NoCompactionTsFileManagement.class);
   // includes sealed and unsealed sequence TsFiles
-  private TreeSet<TsFileResource> sequenceFileTreeSet =
-      new TreeSet<>(
-          (o1, o2) -> {
-            try {
-              int rangeCompare =
-                  Long.compare(
-                      Long.parseLong(o1.getTsFile().getParentFile().getName()),
-                      
Long.parseLong(o2.getTsFile().getParentFile().getName()));
-              return rangeCompare == 0
-                  ? compareFileName(o1.getTsFile(), o2.getTsFile())
-                  : rangeCompare;
-            } catch (NumberFormatException e) {
-              return compareFileName(o1.getTsFile(), o2.getTsFile());
-            }
-          });
+  private final Map<Long, TreeSet<TsFileResource>> sequenceFileTreeSetMap = 
new TreeMap<>();
 
   // includes sealed and unsealed unSequence TsFiles
-  private List<TsFileResource> unSequenceFileList = new ArrayList<>();
+  private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new 
TreeMap<>();
 
   public NoCompactionTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
   }
 
+  @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
     if (sequence) {
-      return new ArrayList<>(sequenceFileTreeSet);
+      synchronized (sequenceFileTreeSetMap) {
+        for (TreeSet<TsFileResource> tsFileResourceTreeSet : 
sequenceFileTreeSetMap.values()) {
+          result.addAll(tsFileResourceTreeSet);
+        }
+      }
     } else {
-      return new ArrayList<>(unSequenceFileList);
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> tsFileResourceList : 
unSequenceFileListMap.values()) {
+          result.addAll(tsFileResourceList);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, 
long timePartition) {
+    if (sequence) {
+      synchronized (sequenceFileTreeSetMap) {
+        return new 
ArrayList<>(sequenceFileTreeSetMap.getOrDefault(timePartition, new 
TreeSet<>()));
+      }
+    } else {
+      synchronized (unSequenceFileListMap) {
+        return new ArrayList<>(
+            unSequenceFileListMap.getOrDefault(timePartition, 
Collections.emptyList()));
+      }
     }
   }
 
@@ -74,27 +89,79 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
   @Override
   public void remove(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      sequenceFileTreeSet.remove(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        TreeSet<TsFileResource> sequenceFileTreeSet =
+            sequenceFileTreeSetMap.get(tsFileResource.getTimePartition());
+        sequenceFileTreeSet.remove(tsFileResource);
+      }
     } else {
-      unSequenceFileList.remove(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        List<TsFileResource> unSequenceFileList =
+            unSequenceFileListMap.get(tsFileResource.getTimePartition());
+        unSequenceFileList.remove(tsFileResource);
+      }
     }
   }
 
   @Override
   public void removeAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    if (sequence) {
-      sequenceFileTreeSet.removeAll(tsFileResourceList);
-    } else {
-      unSequenceFileList.removeAll(tsFileResourceList);
+    if (tsFileResourceList.size() > 0) {
+      tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() - 
o2.getTimePartition()));
+      if (sequence) {
+        synchronized (sequenceFileTreeSetMap) {
+          long currTimePartition = 
tsFileResourceList.get(0).getTimePartition();
+          int startIndex = 0;
+          for (int i = 1; i < tsFileResourceList.size(); i++) {
+            TsFileResource tsFileResource = tsFileResourceList.get(i);
+            if (tsFileResource.getTimePartition() != currTimePartition) {
+              sequenceFileTreeSetMap
+                  .get(currTimePartition)
+                  .removeAll(tsFileResourceList.subList(startIndex, i));
+              currTimePartition = tsFileResource.getTimePartition();
+              startIndex = i;
+            }
+          }
+          sequenceFileTreeSetMap
+              .get(currTimePartition)
+              .removeAll(tsFileResourceList.subList(startIndex, 
tsFileResourceList.size()));
+        }
+      } else {
+        synchronized (unSequenceFileListMap) {
+          long currTimePartition = 
tsFileResourceList.get(0).getTimePartition();
+          int startIndex = 0;
+          for (int i = 1; i < tsFileResourceList.size(); i++) {
+            TsFileResource tsFileResource = tsFileResourceList.get(i);
+            if (tsFileResource.getTimePartition() != currTimePartition) {
+              unSequenceFileListMap
+                  .get(currTimePartition)
+                  .removeAll(tsFileResourceList.subList(startIndex, i));
+              currTimePartition = tsFileResource.getTimePartition();
+              startIndex = i;
+            }
+          }
+          unSequenceFileListMap
+              .get(currTimePartition)
+              .removeAll(tsFileResourceList.subList(startIndex, 
tsFileResourceList.size()));
+        }
+      }
     }
   }
 
   @Override
   public void add(TsFileResource tsFileResource, boolean sequence) {
+    long timePartitionId = tsFileResource.getTimePartition();
     if (sequence) {
-      sequenceFileTreeSet.add(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        sequenceFileTreeSetMap
+            .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
+            .add(tsFileResource);
+      }
     } else {
-      unSequenceFileList.add(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        unSequenceFileListMap
+            .computeIfAbsent(timePartitionId, 
this::newUnSequenceTsFileResources)
+            .add(tsFileResource);
+      }
     }
   }
 
@@ -105,44 +172,73 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    if (sequence) {
-      sequenceFileTreeSet.addAll(tsFileResourceList);
-    } else {
-      unSequenceFileList.addAll(tsFileResourceList);
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
     }
   }
 
   @Override
   public boolean contains(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      return sequenceFileTreeSet.contains(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        return sequenceFileTreeSetMap
+            .getOrDefault(tsFileResource.getTimePartition(), 
newSequenceTsFileResources(0L))
+            .contains(tsFileResource);
+      }
     } else {
-      return unSequenceFileList.contains(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        return unSequenceFileListMap
+            .getOrDefault(tsFileResource.getTimePartition(), new ArrayList<>())
+            .contains(tsFileResource);
+      }
     }
   }
 
   @Override
   public void clear() {
-    sequenceFileTreeSet.clear();
-    unSequenceFileList.clear();
+    sequenceFileTreeSetMap.clear();
+    unSequenceFileListMap.clear();
   }
 
   @Override
   public boolean isEmpty(boolean sequence) {
     if (sequence) {
-      return sequenceFileTreeSet.isEmpty();
+      synchronized (sequenceFileTreeSetMap) {
+        for (Set<TsFileResource> sequenceFileTreeSet : 
sequenceFileTreeSetMap.values()) {
+          if (!sequenceFileTreeSet.isEmpty()) {
+            return false;
+          }
+        }
+      }
     } else {
-      return unSequenceFileList.isEmpty();
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> unSequenceFileList : 
unSequenceFileListMap.values()) {
+          if (!unSequenceFileList.isEmpty()) {
+            return false;
+          }
+        }
+      }
     }
+    return true;
   }
 
   @Override
   public int size(boolean sequence) {
+    int result = 0;
     if (sequence) {
-      return sequenceFileTreeSet.size();
+      synchronized (sequenceFileTreeSetMap) {
+        for (Set<TsFileResource> sequenceFileTreeSet : 
sequenceFileTreeSetMap.values()) {
+          result += sequenceFileTreeSet.size();
+        }
+      }
     } else {
-      return unSequenceFileList.size();
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> unSequenceFileList : 
unSequenceFileListMap.values()) {
+          result += unSequenceFileList.size();
+        }
+      }
     }
+    return result;
   }
 
   @Override
@@ -159,4 +255,12 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
   protected void merge(long timePartition) {
     logger.info("{} no merge logic", storageGroupName);
   }
+
+  private TreeSet<TsFileResource> newSequenceTsFileResources(Long k) {
+    return new TreeSet<>((o1, o2) -> compareFileName(o1.getTsFile(), 
o2.getTsFile()));
+  }
+
+  private List<TsFileResource> newUnSequenceTsFileResources(Long k) {
+    return new ArrayList<>();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 727d9e2..2b4bb4e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -553,7 +553,6 @@ public class MergeMultiChunkTask {
     for (int i = 0; i < batchData.length(); i++) {
       long time = batchData.getTimeByIndex(i);
       // merge data in batch and data in unseqReader
-
       boolean overwriteSeqPoint = false;
       // unseq point.time <= sequence point.time, write unseq point
       while (currTimeValuePairs[pathIdx] != null
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
index b77f2ce..14c72d4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java
@@ -199,4 +199,41 @@ public class MaxFileMergeFileSelectorTest extends 
MergeTest {
     assertEquals(1, mergeResource.getUnseqFiles().size());
     mergeResource.clear();
   }
+
+  /**
+   * test unseq merge select with the following files: {0seq-0-0-0.tsfile 
0-100 1seq-1-1-0.tsfile
+   * 100-200 2seq-2-2-0.tsfile 200-300 3seq-3-3-0.tsfile 300-400 
4seq-4-4-0.tsfile 400-500}
+   * {10unseq-10-10-0.tsfile 0-101}
+   */
+  @Test
+  public void testFileSelectionAboutLastSeqFile()
+      throws MergeException, IOException, WriteProcessException {
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                10
+                    + "unseq"
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 10
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 10
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource largeUnseqTsFileResource = new TsFileResource(file);
+    largeUnseqTsFileResource.setClosed(true);
+    largeUnseqTsFileResource.setMinPlanIndex(10);
+    largeUnseqTsFileResource.setMaxPlanIndex(10);
+    largeUnseqTsFileResource.setVersion(10);
+    prepareFile(largeUnseqTsFileResource, 0, ptNum + 1, 0);
+
+    unseqResources.clear();
+    unseqResources.add(largeUnseqTsFileResource);
+
+    MergeResource resource = new MergeResource(seqResources, unseqResources);
+    IMergeFileSelector mergeFileSelector = new 
MaxFileMergeFileSelector(resource, Long.MAX_VALUE);
+    List[] result = mergeFileSelector.select();
+    assertEquals(2, result[0].size());
+    resource.clear();
+  }
 }

Reply via email to