This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 257fa620b650f0499b31fd9248307989590273ca
Author: 张凌哲 <[email protected]>
AuthorDate: Wed Oct 7 16:23:51 2020 +0800

    comment to fix oom temp
---
 .../level/LevelTsFileManagement.java               | 205 ++++++++++++---------
 1 file changed, 113 insertions(+), 92 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 8c5f482..a121871 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -88,10 +88,10 @@ public class LevelTsFileManagement extends TsFileManagement 
{
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
 
-  private double forkedSeqListPointNum = 0;
-  private double forkedSeqListMeasurementSize = 0;
-  private double forkedUnSeqListPointNum = 0;
-  private double forkedUnSeqListMeasurementSize = 0;
+//  private double forkedSeqListPointNum = 0;
+//  private double forkedSeqListMeasurementSize = 0;
+//  private double forkedUnSeqListPointNum = 0;
+//  private double forkedUnSeqListMeasurementSize = 0;
 
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -431,28 +431,81 @@ public class LevelTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void forkCurrentFileList(long timePartition) {
-    Pair<Double, Double> seqResult = forkTsFileList(
+    forkTsFileList(
         forkedSequenceTsFileResources,
         sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
         maxLevelNum);
-    forkedSeqListPointNum = seqResult.left;
-    forkedSeqListMeasurementSize = seqResult.right;
-    Pair<Double, Double> unSeqResult = forkTsFileList(
+    forkTsFileList(
         forkedUnSequenceTsFileResources,
         unSequenceTsFileResources
             .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources), maxUnseqLevelNum);
-    forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListMeasurementSize = unSeqResult.right;
   }
 
-  private Pair<Double, Double> forkTsFileList(
+//  private Pair<Double, Double> forkTsFileList(
+//      List<List<TsFileResource>> forkedTsFileResources,
+//      List rawTsFileResources, int currMaxLevel) {
+//    forkedTsFileResources.clear();
+//    // just fork part of the TsFile list, controlled by max_merge_chunk_point
+//    long pointNum = 0;
+//    // all flush to target file
+//    ICardinality measurementSet = new HyperLogLog(13);
+//    for (int i = 0; i < currMaxLevel - 1; i++) {
+//      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+//      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
+//          .get(i);
+//      synchronized (levelRawTsFileResources) {
+//        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+//          if (tsFileResource.isClosed()) {
+//            String path = tsFileResource.getTsFile().getAbsolutePath();
+//            try {
+//              if (tsFileResource.getTsFile().exists()) {
+//                TsFileSequenceReader reader = new TsFileSequenceReader(path);
+//                List<Path> pathList = reader.getAllPaths();
+//                for (Path sensorPath : pathList) {
+//                  measurementSet.offer(sensorPath.getFullPath());
+//                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+//                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+//                    pointNum += chunkMetadata.getNumOfPoints();
+//                  }
+//                }
+//              } else {
+//                logger.info("{} tsfile does not exist", path);
+//              }
+//            } catch (IOException e) {
+//              logger.error(
+//                  "{} tsfile reader creates error", path, e);
+//            }
+//          }
+//          if (measurementSet.cardinality() > 0
+//              && pointNum / measurementSet.cardinality() >= 
maxChunkPointNum) {
+//            forkedLevelTsFileResources.add(tsFileResource);
+//            break;
+//          }
+//          forkedLevelTsFileResources.add(tsFileResource);
+//        }
+//      }
+//
+//      if (measurementSet.cardinality() > 0
+//          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
+//        forkedTsFileResources.add(forkedLevelTsFileResources);
+//        break;
+//      }
+//      forkedTsFileResources.add(forkedLevelTsFileResources);
+//    }
+//
+//    // fill in empty file
+//    while (forkedTsFileResources.size() < currMaxLevel) {
+//      List<TsFileResource> emptyForkedLevelTsFileResources = new 
ArrayList<>();
+//      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
+//    }
+//
+//    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
+//  }
+
+  private void forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
       List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
-    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-    long pointNum = 0;
-    // all flush to target file
-    ICardinality measurementSet = new HyperLogLog(13);
     for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
@@ -460,40 +513,10 @@ public class LevelTsFileManagement extends 
TsFileManagement {
       synchronized (levelRawTsFileResources) {
         for (TsFileResource tsFileResource : levelRawTsFileResources) {
           if (tsFileResource.isClosed()) {
-            String path = tsFileResource.getTsFile().getAbsolutePath();
-            try {
-              if (tsFileResource.getTsFile().exists()) {
-                TsFileSequenceReader reader = new TsFileSequenceReader(path);
-                List<Path> pathList = reader.getAllPaths();
-                for (Path sensorPath : pathList) {
-                  measurementSet.offer(sensorPath.getFullPath());
-                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
-                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-                    pointNum += chunkMetadata.getNumOfPoints();
-                  }
-                }
-              } else {
-                logger.info("{} tsfile does not exist", path);
-              }
-            } catch (IOException e) {
-              logger.error(
-                  "{} tsfile reader creates error", path, e);
-            }
-          }
-          if (measurementSet.cardinality() > 0
-              && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
             forkedLevelTsFileResources.add(tsFileResource);
-            break;
           }
-          forkedLevelTsFileResources.add(tsFileResource);
         }
       }
-
-      if (measurementSet.cardinality() > 0
-          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
-        forkedTsFileResources.add(forkedLevelTsFileResources);
-        break;
-      }
       forkedTsFileResources.add(forkedLevelTsFileResources);
     }
 
@@ -502,8 +525,6 @@ public class LevelTsFileManagement extends TsFileManagement 
{
       List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
       forkedTsFileResources.add(emptyForkedLevelTsFileResources);
     }
-
-    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
   }
 
   @Override
@@ -524,59 +545,59 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
-      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      double measurementSize =
-          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
-      logger
-          .info("{} current sg subLevel point num: {}, approximate measurement 
num: {}",
-              storageGroupName, pointNum,
-              measurementSize);
+//      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
+//      double measurementSize =
+//          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
+//      logger
+//          .info("{} current sg subLevel point num: {}, approximate 
measurement num: {}",
+//              storageGroupName, pointNum,
+//              measurementSize);
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
-        // merge all tsfile to last level
-        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
-            mergeResources.size());
-        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
-      } else {
-        for (int i = 0; i < currMaxLevel - 1; i++) {
-          if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
-            if (!sequence && i == currMaxLevel - 2) {
-              merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
-            } else {
-              for (TsFileResource mergeResource : mergeResources.get(i)) {
-                hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
+//      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
+//        // merge all tsfile to last level
+//        logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
+//            mergeResources.size());
+//        flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
+//      } else {
+      for (int i = 0; i < currMaxLevel - 1; i++) {
+        if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+          if (!sequence && i == currMaxLevel - 2) {
+            merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
+          } else {
+            for (TsFileResource mergeResource : mergeResources.get(i)) {
+              hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
+            }
+            File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
+                i + 1);
+            hotCompactionLogger.logSequence(sequence);
+            hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
+            logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
+                storageGroupName, i, mergeResources.get(i).size());
+
+            TsFileResource newResource = new TsFileResource(newLevelFile);
+            HotCompactionUtils
+                .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
+                    new HashSet<>(), sequence);
+            writeLock();
+            try {
+              deleteLevelFiles(timePartition, mergeResources.get(i));
+              hotCompactionLogger.logMergeFinish();
+              if (sequence) {
+                sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+              } else {
+                unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
               }
-              File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
-                  i + 1);
-              hotCompactionLogger.logSequence(sequence);
-              hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
-              logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
-                  storageGroupName, i, mergeResources.get(i).size());
-
-              TsFileResource newResource = new TsFileResource(newLevelFile);
-              HotCompactionUtils
-                  .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
-                      new HashSet<>(), sequence);
-              writeLock();
-              try {
-                deleteLevelFiles(timePartition, mergeResources.get(i));
-                hotCompactionLogger.logMergeFinish();
-                if (sequence) {
-                  sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-                } else {
-                  unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-                }
-                if (mergeResources.size() > i + 1) {
-                  mergeResources.get(i + 1).add(newResource);
-                }
-              } finally {
-                writeUnlock();
+              if (mergeResources.size() > i + 1) {
+                mergeResources.get(i + 1).add(newResource);
               }
+            } finally {
+              writeUnlock();
             }
           }
         }
       }
+//      }
       hotCompactionLogger.close();
       File logFile = FSFactoryProducer.getFSFactory()
           .getFile(storageGroupDir, storageGroupName + 
HOT_COMPACTION_LOG_NAME);

Reply via email to