This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new df6dddc  fix hot compaction read bug (#1876)
df6dddc is described below

commit df6dddc198d13d83952c5ea20c23a7455315eaae
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Tue Oct 27 23:45:28 2020 +0800

    fix hot compaction read bug (#1876)
---
 .../resources/conf/iotdb-engine.properties         |   2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +-
 .../level/LevelTsFileManagement.java               | 187 +++++----------------
 3 files changed, 45 insertions(+), 146 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 183ba86..82d83f0 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -265,7 +265,7 @@ default_fill_interval=-1
 ### Merge Configurations
 ####################
 # TsFile manage strategy, define use which hot compaction strategy
-# now we have normal_strategy, level_strategy
+# now we have NORMAL_STRATEGY, LEVEL_STRATEGY
 tsfile_manage_strategy=NORMAL_STRATEGY
 
 # Work when tsfile_manage_strategy is level_strategy.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9374ccb..3cc67ba 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -743,7 +743,7 @@ public class TsFileProcessor {
       Map<String, String> props, QueryContext context,
       List<TsFileResource> tsfileResourcesForQuery) throws IOException, 
MetadataException {
     if (logger.isDebugEnabled()) {
-      logger.debug("{}: {} get flushQueryLock and vmMergeLock read lock", 
storageGroupName,
+      logger.debug("{}: {} get flushQueryLock and hotCompactionMergeLock read 
lock", storageGroupName,
           tsFileResource.getTsFile().getName());
     }
     flushQueryLock.readLock().lock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 9e95c63..575e645 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -72,8 +72,6 @@ public class LevelTsFileManagement extends TsFileManagement {
       .getMaxUnseqLevelNum();
   private final int maxUnseqFileNumInEachLevel = 
IoTDBDescriptor.getInstance().getConfig()
       .getMaxFileNumInEachLevel();
-  private final int maxChunkPointNum = 
IoTDBDescriptor.getInstance().getConfig()
-      .getMergeChunkPointNumberThreshold();
   private final boolean isForceFullMerge = 
IoTDBDescriptor.getInstance().getConfig()
       .isForceFullMerge();
   // First map is partition list; Second list is level list; Third list is 
file list in level;
@@ -82,11 +80,6 @@ public class LevelTsFileManagement extends TsFileManagement {
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
 
-  private double forkedSeqListPointNum = 0;
-  private double forkedSeqListMeasurementSize = 0;
-  private double forkedUnSeqListPointNum = 0;
-  private double forkedUnSeqListMeasurementSize = 0;
-
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
     clear();
@@ -94,8 +87,8 @@ public class LevelTsFileManagement extends TsFileManagement {
 
   private void deleteLevelFiles(long timePartitionId, 
Collection<TsFileResource> mergeTsFiles) {
     logger.debug("{} [hot compaction] merge starts to delete file", 
storageGroupName);
-    for (TsFileResource vmMergeTsFile : mergeTsFiles) {
-      deleteLevelFile(vmMergeTsFile);
+    for (TsFileResource mergeTsFile : mergeTsFiles) {
+      deleteLevelFile(mergeTsFile);
     }
     for (int i = 0; i < maxLevelNum; i++) {
       
sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
@@ -119,36 +112,6 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     }
   }
 
-  private void flushAllFilesToLastLevel(long timePartitionId,
-      List<List<TsFileResource>> currMergeFiles,
-      HotCompactionLogger hotCompactionLogger, boolean sequence) throws 
IOException {
-    TsFileResource sourceFile = currMergeFiles.get(0).get(0);
-    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(),
-        sequence ? (maxLevelNum - 1) : (maxUnseqLevelNum - 1));
-    TsFileResource targetResource = new TsFileResource(newTargetFile);
-    List<TsFileResource> mergeFiles = new ArrayList<>();
-    for (int i = currMergeFiles.size() - 1; i >= 0; i--) {
-      mergeFiles.addAll(sequenceTsFileResources.get(timePartitionId).get(i));
-    }
-    HotCompactionUtils.merge(targetResource, mergeFiles,
-        storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
-    hotCompactionLogger.logFullMerge();
-    hotCompactionLogger.logSequence(sequence);
-    hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
-    writeLock();
-    if (sequence) {
-      for (int i = 0; i < maxLevelNum - 1; i++) {
-        deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
-      }
-    } else {
-      for (int i = 0; i < maxUnseqLevelNum - 1; i++) {
-        deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
-      }
-    }
-    writeUnlock();
-    hotCompactionLogger.logMergeFinish();
-  }
-
   @Override
   public List<TsFileResource> getStableTsFileList(boolean sequence) {
     List<TsFileResource> result = new ArrayList<>();
@@ -415,13 +378,13 @@ public class LevelTsFileManagement extends 
TsFileManagement {
         }
       }
     } catch (IOException e) {
-      logger.error("recover vm error ", e);
+      logger.error("recover level tsfile management error ", e);
     } finally {
       if (logFile.exists()) {
         try {
           Files.delete(logFile.toPath());
         } catch (IOException e) {
-          logger.error("delete vm log file error ", e);
+          logger.error("delete level tsfile management log file error ", e);
         }
       }
     }
@@ -429,88 +392,38 @@ public class LevelTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void forkCurrentFileList(long timePartition) {
-    Pair<Double, Double> seqStatisticsPair = forkTsFileList(
+    forkTsFileList(
         forkedSequenceTsFileResources,
         sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
         maxLevelNum);
-    forkedSeqListPointNum = seqStatisticsPair.left;
-    forkedSeqListMeasurementSize = seqStatisticsPair.right;
-    Pair<Double, Double> unSeqStatisticsPair = forkTsFileList(
+    forkTsFileList(
         forkedUnSequenceTsFileResources,
         unSequenceTsFileResources
             .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources),
         maxUnseqLevelNum);
-    forkedUnSeqListPointNum = unSeqStatisticsPair.left;
-    forkedUnSeqListMeasurementSize = unSeqStatisticsPair.right;
   }
 
-  private Pair<Double, Double> forkTsFileList(
+  private void forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
       List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
-    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-    long pointNum = 0;
-    // all flush to target file
-    ICardinality measurementSet = new HyperLogLog(13);
     for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
           .get(i);
-      List<TsFileResource> allCurrLevelTsFileResources = new 
ArrayList<>(levelRawTsFileResources);
-      for (TsFileResource tsFileResource : allCurrLevelTsFileResources) {
+      for (TsFileResource tsFileResource : levelRawTsFileResources) {
         if (tsFileResource.isClosed()) {
-          String path = tsFileResource.getTsFile().getAbsolutePath();
-          if (tsFileResource.getTsFile().exists()) {
-            try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) 
{
-              List<String> devices = reader.getAllDevices();
-              for (String device : devices) {
-                Map<String, List<ChunkMetadata>> 
measurementChunkMetadataListMap = reader
-                    .readChunkMetadataInDevice(device);
-                for (Entry<String, List<ChunkMetadata>> 
measurementChunkMetadataList : measurementChunkMetadataListMap
-                    .entrySet()) {
-                  Path sensorPath = new Path(device, 
measurementChunkMetadataList.getKey());
-                  measurementSet.offer(sensorPath.getFullPath());
-                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
-                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-                    pointNum += chunkMetadata.getNumOfPoints();
-                  }
-                }
-              }
-            } catch (IOException e) {
-              logger.error(
-                  "{} tsfile reader creates error", path, e);
-            }
-          } else {
-            logger.info("{} tsfile does not exist", path);
-          }
-        }
-        forkedLevelTsFileResources.add(tsFileResource);
-        if (measurementSet.cardinality() > 0
-            && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
-          break;
+          forkedLevelTsFileResources.add(tsFileResource);
         }
       }
-
       forkedTsFileResources.add(forkedLevelTsFileResources);
-      if (measurementSet.cardinality() > 0
-          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
-        break;
-      }
     }
-
-    // fill in empty file
-    while (forkedTsFileResources.size() < currMaxLevel) {
-      List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
-      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
-    }
-
-    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
   }
 
   @Override
   protected void merge(long timePartition) {
     merge(forkedSequenceTsFileResources, true, timePartition, maxLevelNum, 
maxFileNumInEachLevel);
-    if (maxUnseqLevelNum <= 1) {
+    if (maxUnseqLevelNum <= 1 && forkedUnSequenceTsFileResources.size() > 0) {
       merge(isForceFullMerge, getTsFileList(true), 
forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     } else {
@@ -525,57 +438,43 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
-      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      double measurementSize =
-          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
-      logger
-          .info("{} current sg subLevel point num: {}, approximate measurement 
num: {}",
-              storageGroupName, pointNum,
-              measurementSize);
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
-        // merge all tsfile to last level
-        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
-            mergeResources.size());
-        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
-      } else {
-        for (int i = 0; i < currMaxLevel - 1; i++) {
-          if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
-            //level is numbered from 0
-            if (!sequence && i == currMaxLevel - 2) {
-              // do not merge current unseq file level to upper level and just 
merge all of them to seq file
-              merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
-            } else {
-              for (TsFileResource mergeResource : mergeResources.get(i)) {
-                hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
+      for (int i = 0; i < currMaxLevel - 1; i++) {
+        if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+          //level is numbered from 0
+          if (!sequence && i == currMaxLevel - 2) {
+            // do not merge current unseq file level to upper level and just 
merge all of them to seq file
+            merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
+          } else {
+            for (TsFileResource mergeResource : mergeResources.get(i)) {
+              hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
+            }
+            File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
+                i + 1);
+            hotCompactionLogger.logSequence(sequence);
+            hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
+            logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level",
+                storageGroupName, i, mergeResources.get(i).size());
+
+            TsFileResource newResource = new TsFileResource(newLevelFile);
+            HotCompactionUtils
+                .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
+                    new HashSet<>(), sequence);
+            writeLock();
+            try {
+              deleteLevelFiles(timePartition, mergeResources.get(i));
+              hotCompactionLogger.logMergeFinish();
+              if (sequence) {
+                sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+              } else {
+                unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
               }
-              File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
-                  i + 1);
-              hotCompactionLogger.logSequence(sequence);
-              hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
-              logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
-                  storageGroupName, i, mergeResources.get(i).size());
-
-              TsFileResource newResource = new TsFileResource(newLevelFile);
-              HotCompactionUtils
-                  .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
-                      new HashSet<>(), sequence);
-              writeLock();
-              try {
-                deleteLevelFiles(timePartition, mergeResources.get(i));
-                hotCompactionLogger.logMergeFinish();
-                if (sequence) {
-                  sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-                } else {
-                  unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-                }
-                if (mergeResources.size() > i + 1) {
-                  mergeResources.get(i + 1).add(newResource);
-                }
-              } finally {
-                writeUnlock();
+              if (mergeResources.size() > i + 1) {
+                mergeResources.get(i + 1).add(newResource);
               }
+            } finally {
+              writeUnlock();
             }
           }
         }

Reply via email to