This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5fd7de3b4fe69e6558cf35c8e9d60660ba5f1bb7
Author: 张凌哲 <[email protected]>
AuthorDate: Mon Oct 19 15:27:09 2020 +0800

    uncomment flush all code
---
 .../level/LevelTsFileManagement.java               | 199 +++++++++------------
 1 file changed, 88 insertions(+), 111 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index a121871..d0e5d2b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -88,10 +88,10 @@ public class LevelTsFileManagement extends TsFileManagement 
{
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
 
-//  private double forkedSeqListPointNum = 0;
-//  private double forkedSeqListMeasurementSize = 0;
-//  private double forkedUnSeqListPointNum = 0;
-//  private double forkedUnSeqListMeasurementSize = 0;
+  private double forkedSeqListPointNum = 0;
+  private double forkedSeqListMeasurementSize = 0;
+  private double forkedUnSeqListPointNum = 0;
+  private double forkedUnSeqListMeasurementSize = 0;
 
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -441,71 +441,14 @@ public class LevelTsFileManagement extends 
TsFileManagement {
             .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources), maxUnseqLevelNum);
   }
 
-//  private Pair<Double, Double> forkTsFileList(
-//      List<List<TsFileResource>> forkedTsFileResources,
-//      List rawTsFileResources, int currMaxLevel) {
-//    forkedTsFileResources.clear();
-//    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-//    long pointNum = 0;
-//    // all flush to target file
-//    ICardinality measurementSet = new HyperLogLog(13);
-//    for (int i = 0; i < currMaxLevel - 1; i++) {
-//      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
-//      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
-//          .get(i);
-//      synchronized (levelRawTsFileResources) {
-//        for (TsFileResource tsFileResource : levelRawTsFileResources) {
-//          if (tsFileResource.isClosed()) {
-//            String path = tsFileResource.getTsFile().getAbsolutePath();
-//            try {
-//              if (tsFileResource.getTsFile().exists()) {
-//                TsFileSequenceReader reader = new TsFileSequenceReader(path);
-//                List<Path> pathList = reader.getAllPaths();
-//                for (Path sensorPath : pathList) {
-//                  measurementSet.offer(sensorPath.getFullPath());
-//                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
-//                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-//                    pointNum += chunkMetadata.getNumOfPoints();
-//                  }
-//                }
-//              } else {
-//                logger.info("{} tsfile does not exist", path);
-//              }
-//            } catch (IOException e) {
-//              logger.error(
-//                  "{} tsfile reader creates error", path, e);
-//            }
-//          }
-//          if (measurementSet.cardinality() > 0
-//              && pointNum / measurementSet.cardinality() >= 
maxChunkPointNum) {
-//            forkedLevelTsFileResources.add(tsFileResource);
-//            break;
-//          }
-//          forkedLevelTsFileResources.add(tsFileResource);
-//        }
-//      }
-//
-//      if (measurementSet.cardinality() > 0
-//          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
-//        forkedTsFileResources.add(forkedLevelTsFileResources);
-//        break;
-//      }
-//      forkedTsFileResources.add(forkedLevelTsFileResources);
-//    }
-//
-//    // fill in empty file
-//    while (forkedTsFileResources.size() < currMaxLevel) {
-//      List<TsFileResource> emptyForkedLevelTsFileResources = new 
ArrayList<>();
-//      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
-//    }
-//
-//    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
-//  }
-
-  private void forkTsFileList(
+  private Pair<Double, Double> forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
       List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
+    // just fork part of the TsFile list, controlled by max_merge_chunk_point
+    long pointNum = 0;
+    // all flush to target file
+    ICardinality measurementSet = new HyperLogLog(13);
     for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
@@ -513,10 +456,41 @@ public class LevelTsFileManagement extends 
TsFileManagement {
       synchronized (levelRawTsFileResources) {
         for (TsFileResource tsFileResource : levelRawTsFileResources) {
           if (tsFileResource.isClosed()) {
+            String path = tsFileResource.getTsFile().getAbsolutePath();
+            try {
+              if (tsFileResource.getTsFile().exists()) {
+                TsFileSequenceReader reader = new TsFileSequenceReader(path);
+                List<Path> pathList = reader.getAllPaths();
+                for (Path sensorPath : pathList) {
+                  measurementSet.offer(sensorPath.getFullPath());
+                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+                    pointNum += chunkMetadata.getNumOfPoints();
+                  }
+                }
+                reader.close();
+              } else {
+                logger.info("{} tsfile does not exist", path);
+              }
+            } catch (IOException e) {
+              logger.error(
+                  "{} tsfile reader creates error", path, e);
+            }
+          }
+          if (measurementSet.cardinality() > 0
+              && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
             forkedLevelTsFileResources.add(tsFileResource);
+            break;
           }
+          forkedLevelTsFileResources.add(tsFileResource);
         }
       }
+
+      if (measurementSet.cardinality() > 0
+          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
+        forkedTsFileResources.add(forkedLevelTsFileResources);
+        break;
+      }
       forkedTsFileResources.add(forkedLevelTsFileResources);
     }
 
@@ -525,6 +499,8 @@ public class LevelTsFileManagement extends TsFileManagement 
{
       List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
       forkedTsFileResources.add(emptyForkedLevelTsFileResources);
     }
+
+    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
   }
 
   @Override
@@ -545,59 +521,60 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
-//      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-//      double measurementSize =
-//          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
-//      logger
-//          .info("{} current sg subLevel point num: {}, approximate 
measurement num: {}",
-//              storageGroupName, pointNum,
-//              measurementSize);
+      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
+      double measurementSize =
+          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
+      logger
+          .info("{} current sg subLevel point num: {}, approximate measurement 
num: {}",
+              storageGroupName, pointNum,
+              measurementSize);
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-//      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
-//        // merge all tsfile to last level
-//        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
-//            mergeResources.size());
-//        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
-//      } else {
-      for (int i = 0; i < currMaxLevel - 1; i++) {
-        if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
-          if (!sequence && i == currMaxLevel - 2) {
-            merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
-          } else {
-            for (TsFileResource mergeResource : mergeResources.get(i)) {
-              hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
-            }
-            File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
-                i + 1);
-            hotCompactionLogger.logSequence(sequence);
-            hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
-            logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
-                storageGroupName, i, mergeResources.get(i).size());
-
-            TsFileResource newResource = new TsFileResource(newLevelFile);
-            HotCompactionUtils
-                .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
-                    new HashSet<>(), sequence);
-            writeLock();
-            try {
-              deleteLevelFiles(timePartition, mergeResources.get(i));
-              hotCompactionLogger.logMergeFinish();
-              if (sequence) {
-                sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-              } else {
-                unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
+        // merge all tsfile to last level
+        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
+            mergeResources.size());
+        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
+      } else {
+        for (int i = 0; i < currMaxLevel - 1; i++) {
+          if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+            if (!sequence && i == currMaxLevel - 2) {
+              // do not merge current unseq file level to upper level and just 
merge all of them to seq file
+              merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
+            } else {
+              for (TsFileResource mergeResource : mergeResources.get(i)) {
+                hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
               }
-              if (mergeResources.size() > i + 1) {
-                mergeResources.get(i + 1).add(newResource);
+              File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
+                  i + 1);
+              hotCompactionLogger.logSequence(sequence);
+              hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
+              logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
+                  storageGroupName, i, mergeResources.get(i).size());
+
+              TsFileResource newResource = new TsFileResource(newLevelFile);
+              HotCompactionUtils
+                  .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
+                      new HashSet<>(), sequence);
+              writeLock();
+              try {
+                deleteLevelFiles(timePartition, mergeResources.get(i));
+                hotCompactionLogger.logMergeFinish();
+                if (sequence) {
+                  sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+                } else {
+                  unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+                }
+                if (mergeResources.size() > i + 1) {
+                  mergeResources.get(i + 1).add(newResource);
+                }
+              } finally {
+                writeUnlock();
               }
-            } finally {
-              writeUnlock();
             }
           }
         }
       }
-//      }
       hotCompactionLogger.close();
       File logFile = FSFactoryProducer.getFSFactory()
           .getFile(storageGroupDir, storageGroupName + 
HOT_COMPACTION_LOG_NAME);

Reply via email to