This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f6008ab0c56bce6d7cb1adbc5e0775e252727690
Author: 张凌哲 <[email protected]>
AuthorDate: Thu Sep 24 12:06:32 2020 +0800

    add device chunk point cache
---
 .../db/engine/cache/FileChunkPointSizeCache.java   | 68 ++++++++++++++++
 .../engine/storagegroup/StorageGroupProcessor.java |  1 +
 .../db/engine/storagegroup/TsFileProcessor.java    | 33 +++++++-
 .../db/engine/storagegroup/TsFileResource.java     |  6 +-
 .../level/LevelTsFileManagement.java               | 92 ++++++++--------------
 .../tsfilemanagement/utils/HotCompactionUtils.java | 20 +++--
 6 files changed, 150 insertions(+), 70 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java
new file mode 100644
index 0000000..38ab644
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/cache/FileChunkPointSizeCache.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.db.engine.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileChunkPointSizeCache {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FileChunkPointSizeCache.class);
+
+  // (absolute path,avg chunk point size)
+  private Map<String, Map<String, Long>> tsfileDeviceChunkPointCache;
+
+  private FileChunkPointSizeCache() {
+    tsfileDeviceChunkPointCache = new HashMap<>();
+  }
+
+  public static FileChunkPointSizeCache getInstance() {
+    return FileChunkPointSizeCacheHolder.INSTANCE;
+  }
+
+  public Map<String, Long> get(File tsfile) {
+    String path = tsfile.getAbsolutePath();
+    return tsfileDeviceChunkPointCache.computeIfAbsent(path, k -> {
+      Map<String, Long> deviceChunkPointMap = new HashMap<>();
+      try {
+        if (tsfile.exists()) {
+          TsFileSequenceReader reader = new TsFileSequenceReader(path);
+          List<Path> pathList = reader.getAllPaths();
+          for (Path sensorPath : pathList) {
+            String device = sensorPath.getDevice();
+            long chunkPointNum = deviceChunkPointMap.getOrDefault(device, 0L);
+            List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+            for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+              chunkPointNum += chunkMetadata.getNumOfPoints();
+            }
+            deviceChunkPointMap.put(device, chunkPointNum);
+          }
+        } else {
+          logger.info("{} tsfile does not exist", path);
+        }
+      } catch (IOException e) {
+        logger.error(
+            "{} tsfile reader creates error", path, e);
+      }
+      return deviceChunkPointMap;
+    });
+  }
+
+  public void put(String tsfilePath, Map<String, Long> deviceChunkPointMap) {
+    tsfileDeviceChunkPointCache.put(tsfilePath, deviceChunkPointMap);
+  }
+
+  /**
+   * singleton pattern.
+   */
+  private static class FileChunkPointSizeCacheHolder {
+
+    private static final FileChunkPointSizeCache INSTANCE = new 
FileChunkPointSizeCache();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 28783ac..0011532 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1580,6 +1580,7 @@ public class StorageGroupProcessor {
                 tsFileManagement.new 
HotCompactionMergeTask(this::closeHotCompactionMergeCallBack,
                     tsFileProcessor.getTimeRangeId()));
       } catch (RejectedExecutionException | IOException e) {
+        e.printStackTrace();
         this.closeHotCompactionMergeCallBack();
         logger.error("{} hot compaction submit task failed", storageGroupName);
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index bd48984..136b979 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -24,8 +24,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
 import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
+import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
@@ -165,11 +168,13 @@ public class TsFileProcessor {
     }
 
     // update start time of this memtable
-    tsFileResource.updateStartTime(insertRowPlan.getDeviceId().getFullPath(), 
insertRowPlan.getTime());
+    tsFileResource
+        .updateStartTime(insertRowPlan.getDeviceId().getFullPath(), 
insertRowPlan.getTime());
     //for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
     //for unsequence tsfile, we have to update the endTime for each insertion.
     if (!sequence) {
-      tsFileResource.updateEndTime(insertRowPlan.getDeviceId().getFullPath(), 
insertRowPlan.getTime());
+      tsFileResource
+          .updateEndTime(insertRowPlan.getDeviceId().getFullPath(), 
insertRowPlan.getTime());
     }
   }
 
@@ -209,7 +214,8 @@ public class TsFileProcessor {
     }
 
     tsFileResource
-        .updateStartTime(insertTabletPlan.getDeviceId().getFullPath(), 
insertTabletPlan.getTimes()[start]);
+        .updateStartTime(insertTabletPlan.getDeviceId().getFullPath(),
+            insertTabletPlan.getTimes()[start]);
 
     //for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
     //for unsequence tsfile, we have to update the endTime for each insertion.
@@ -645,8 +651,29 @@ public class TsFileProcessor {
     }
   }
 
+  private void updateDeviceChunkPointSizeCache() {
+    Map<String, Map<String, List<ChunkMetadata>>> 
deviceMeasurementChunkMetadataMap = writer
+        .getMetadatasForQuery();
+    Map<String, Long> deviceChunkPointMap = new HashMap<>();
+    for (Entry<String, Map<String, List<ChunkMetadata>>> 
deviceMeasurementChunkMetadataEntry : deviceMeasurementChunkMetadataMap
+        .entrySet()) {
+      String device = deviceMeasurementChunkMetadataEntry.getKey();
+      long chunkPointNum = 0;
+      for (Entry<String, List<ChunkMetadata>> measurementChunkMetadataEntry : 
deviceMeasurementChunkMetadataEntry
+          .getValue().entrySet()) {
+        for (ChunkMetadata chunkMetadata : 
measurementChunkMetadataEntry.getValue()) {
+          chunkPointNum += chunkMetadata.getNumOfPoints();
+        }
+      }
+      deviceChunkPointMap.put(device, chunkPointNum);
+    }
+    FileChunkPointSizeCache.getInstance()
+        .put(tsFileResource.getTsFile().getAbsolutePath(), 
deviceChunkPointMap);
+  }
+
   private void endFile() throws IOException, TsFileProcessorException {
     long closeStartTime = System.currentTimeMillis();
+    updateDeviceChunkPointSizeCache();
     tsFileResource.serialize();
     writer.endFile();
     tsFileResource.cleanCloseFlag();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2cbb7b8..f71f83a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -62,7 +62,8 @@ import org.slf4j.LoggerFactory;
 public class TsFileResource {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileResource.class);
-  private static Map<String, String> cachedDevicePool = 
CachedStringPool.getInstance().getCachedPool();
+  private static Map<String, String> cachedDevicePool = 
CachedStringPool.getInstance()
+      .getCachedPool();
 
   // tsfile
   private File file;
@@ -783,7 +784,8 @@ public class TsFileResource {
     }
 
     while (true) {
-      String hardlinkSuffix = TsFileConstant.PATH_SEPARATOR + 
System.currentTimeMillis() + "_" + random.nextLong();
+      String hardlinkSuffix =
+          TsFileConstant.PATH_SEPARATOR + System.currentTimeMillis() + "_" + 
random.nextLong();
       File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix);
 
       try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 0c74be3..c9173f7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -43,17 +42,15 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
 import 
org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
 import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,9 +70,9 @@ public class LevelTsFileManagement extends TsFileManagement {
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
   private long forkedSeqListPointNum = 0;
-  private Map<Path, MeasurementSchema> forkedSeqListPathMeasurementSchemaMap = 
new HashMap<>();
+  private Set<String> forkedSeqListDeviceSet = new HashSet<>();
   private long forkedUnSeqListPointNum = 0;
-  private Map<Path, MeasurementSchema> forkedUnSeqListPathMeasurementSchemaMap 
= new HashMap<>();
+  private Set<String> forkedUnSeqListDeviceSet = new HashSet<>();
 
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -405,74 +402,55 @@ public class LevelTsFileManagement extends 
TsFileManagement {
   }
 
   @Override
-  public void forkCurrentFileList(long timePartition) throws IOException {
-    Pair<Long, Map<Path, MeasurementSchema>> seqResult = forkTsFileList(
+  public void forkCurrentFileList(long timePartition) {
+    Pair<Long, Set<String>> seqResult = forkTsFileList(
         forkedSequenceTsFileResources,
         sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources));
     forkedSeqListPointNum = seqResult.left;
-    forkedSeqListPathMeasurementSchemaMap = seqResult.right;
-    Pair<Long, Map<Path, MeasurementSchema>> unSeqResult = forkTsFileList(
+    forkedSeqListDeviceSet = seqResult.right;
+    Pair<Long, Set<String>> unSeqResult = forkTsFileList(
         forkedUnSequenceTsFileResources,
         unSequenceTsFileResources
             .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
     forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListPathMeasurementSchemaMap = unSeqResult.right;
+    forkedUnSeqListDeviceSet = unSeqResult.right;
   }
 
-  private Pair<Long, Map<Path, MeasurementSchema>> forkTsFileList(
+  private Pair<Long, Set<String>> forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources) throws IOException {
+      List rawTsFileResources) {
     forkedTsFileResources.clear();
     // just fork part of the TsFile list, controlled by max_merge_chunk_point
     long pointNum = 0;
     // all flush to target file
-    Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
+    Set<String> deviceSet = new HashSet<>();
     for (int i = 0; i < maxLevelNum - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
           .get(i);
-      for (TsFileResource tsFileResource : levelRawTsFileResources) {
-        if (tsFileResource.isClosed()) {
-          RestorableTsFileIOWriter writer;
-          try {
-            writer = new RestorableTsFileIOWriter(
-                tsFileResource.getTsFile());
-          } catch (Exception e) {
-            logger.error("[Hot Compaction] {} open writer failed",
-                tsFileResource.getTsFile().getPath(), e);
-            continue;
-          }
-          Map<String, Map<String, List<ChunkMetadata>>> schemaMap = writer
-              .getMetadatasForQuery();
-          for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry 
: schemaMap
-              .entrySet()) {
-            String device = schemaMapEntry.getKey();
-            for (Entry<String, List<ChunkMetadata>> entry : 
schemaMapEntry.getValue()
-                .entrySet()) {
-              String measurement = entry.getKey();
-              List<ChunkMetadata> chunkMetadataList = entry.getValue();
-              for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-                pointNum += chunkMetadata.getNumOfPoints();
-              }
-              pathMeasurementSchemaMap.computeIfAbsent(new Path(device, 
measurement), k ->
-                  new MeasurementSchema(measurement, 
chunkMetadataList.get(0).getDataType()));
+      synchronized (levelRawTsFileResources) {
+        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+          if (tsFileResource.isClosed()) {
+            Map<String, Long> chunkPointMap = 
FileChunkPointSizeCache.getInstance()
+                .get(tsFileResource.getTsFile());
+            for (Entry<String, Long> deviceChunkPoint : 
chunkPointMap.entrySet()) {
+              deviceSet.add(deviceChunkPoint.getKey());
+              pointNum += deviceChunkPoint.getValue();
             }
           }
-          writer.close();
-          forkedLevelTsFileResources.add(tsFileResource);
-        }
-        if (pathMeasurementSchemaMap.size() > 0
-            && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) 
{
-          break;
+          if (deviceSet.size() > 0
+              && pointNum / deviceSet.size() >= maxChunkPointNum) {
+            break;
+          }
         }
       }
-      if (pathMeasurementSchemaMap.size() > 0
-          && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) {
+      if (deviceSet.size() > 0
+          && pointNum / deviceSet.size() >= maxChunkPointNum) {
         break;
       }
       forkedTsFileResources.add(forkedLevelTsFileResources);
     }
-    return new Pair<>(pointNum, pathMeasurementSchemaMap);
+    return new Pair<>(pointNum, deviceSet);
   }
 
   @Override
@@ -488,24 +466,22 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
       long pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      Map<Path, MeasurementSchema> pathMeasurementSchemaMap =
-          sequence ? forkedSeqListPathMeasurementSchemaMap
-              : forkedUnSeqListPathMeasurementSchemaMap;
-      logger.info("{} current sg subLevel point num: {}, measurement num: {}", 
storageGroupName,
-          pointNum, pathMeasurementSchemaMap.size());
+      Set<String> deviceSet =
+          sequence ? forkedSeqListDeviceSet : forkedUnSeqListDeviceSet;
+      logger
+          .info("{} current sg subLevel point num: {}, device num: {}", 
storageGroupName, pointNum,
+              deviceSet.size());
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-      if (pathMeasurementSchemaMap.size() > 0
-          && pointNum / pathMeasurementSchemaMap.size() > 
IoTDBDescriptor.getInstance().getConfig()
-          .getMergeChunkPointNumberThreshold()) {
+      if (deviceSet.size() > 0 && pointNum / deviceSet.size() > 
IoTDBDescriptor.getInstance()
+          .getConfig().getMergeChunkPointNumberThreshold()) {
         // merge all tsfile to last level
         logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
             mergeResources.size());
         flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
       } else {
         for (int i = 0; i < maxLevelNum - 1; i++) {
-          if 
(IoTDBDescriptor.getInstance().getConfig().getMaxFileNumInEachLevel() <= 
mergeResources
-              .get(i).size()) {
+          if (maxFileNumInEachLevel <= mergeResources.get(i).size()) {
             for (TsFileResource mergeResource : mergeResources.get(i)) {
               hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
             }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
index b9db31a..1ee6faf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
@@ -58,8 +58,9 @@ public class HotCompactionUtils {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> writeSeqChunk(String storageGroup,
-      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String 
deviceId,
+  private static Pair<ChunkMetadata, Chunk> readSeqChunk(String storageGroup,
+      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap,
+      Map<String, List<String>> tsFileDevicesMap, String deviceId,
       String measurementId,
       List<TsFileResource> levelResources)
       throws IOException {
@@ -69,7 +70,8 @@ public class HotCompactionUtils {
       TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
           tsFileSequenceReaderMap,
           storageGroup);
-      if (reader == null || !reader.getAllDevices().contains(deviceId)) {
+      if (reader == null || 
!tsFileDevicesMap.get(levelResource.getTsFile().getAbsolutePath())
+          .contains(deviceId)) {
         continue;
       }
       List<ChunkMetadata> chunkMetadataList = reader
@@ -125,7 +127,8 @@ public class HotCompactionUtils {
   private static void fillDeviceMeasurementMap(Set<String> devices,
       Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap,
       List<TsFileResource> subLevelResources,
-      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String 
storageGroup)
+      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap,
+      Map<String, List<String>> tsFileDevicesMap, String storageGroup)
       throws IOException {
     for (TsFileResource levelResource : subLevelResources) {
       TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
@@ -134,6 +137,8 @@ public class HotCompactionUtils {
       if (reader == null) {
         continue;
       }
+      tsFileDevicesMap
+          .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), 
reader.getAllDevices());
       List<Path> allPaths = reader.getAllPaths();
       Map<String, TSDataType> allMeasurements = reader.getAllMeasurements();
       // device, measurement -> chunk metadata list
@@ -158,10 +163,11 @@ public class HotCompactionUtils {
       Set<String> devices, boolean sequence) throws IOException {
     RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new 
HashMap<>();
+    Map<String, List<String>> tsFileDevicesMap = new HashMap<>();
     Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new 
HashMap<>();
     RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
     fillDeviceMeasurementMap(devices, deviceMeasurementMap, tsFileResources,
-        tsFileSequenceReaderMap, storageGroup);
+        tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup);
     if (!sequence) {
       for (Entry<String, Map<String, MeasurementSchema>> 
deviceMeasurementEntry : deviceMeasurementMap
           .entrySet()) {
@@ -199,8 +205,8 @@ public class HotCompactionUtils {
         for (Entry<String, MeasurementSchema> entry : 
deviceMeasurementEntry.getValue()
             .entrySet()) {
           String measurementId = entry.getKey();
-          Pair<ChunkMetadata, Chunk> chunkPair = writeSeqChunk(storageGroup,
-              tsFileSequenceReaderMap, deviceId, measurementId, 
tsFileResources);
+          Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(storageGroup,
+              tsFileSequenceReaderMap, tsFileDevicesMap, deviceId, 
measurementId, tsFileResources);
           ChunkMetadata newChunkMetadata = chunkPair.left;
           Chunk newChunk = chunkPair.right;
           if (newChunkMetadata != null && newChunk != null) {

Reply via email to