This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit acc3d49df4ec1b32009e156766c645eb97eddca2
Author: 张凌哲 <[email protected]>
AuthorDate: Fri Sep 25 00:11:29 2020 +0800

    upgrade merge method
---
 .../tsfilemanagement/utils/HotCompactionUtils.java | 163 ++++++++++-----------
 1 file changed, 78 insertions(+), 85 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
index 1ee6faf..d669aae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
@@ -23,8 +23,11 @@ import static 
org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
 
 import com.google.common.util.concurrent.RateLimiter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -37,7 +40,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -58,29 +60,14 @@ public class HotCompactionUtils {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> readSeqChunk(String storageGroup,
-      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap,
-      Map<String, List<String>> tsFileDevicesMap, String deviceId,
-      String measurementId,
-      List<TsFileResource> levelResources)
-      throws IOException {
+  private static Pair<ChunkMetadata, Chunk> readSeqChunk(
+      Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
-    for (TsFileResource levelResource : levelResources) {
-      TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
-          tsFileSequenceReaderMap,
-          storageGroup);
-      if (reader == null || 
!tsFileDevicesMap.get(levelResource.getTsFile().getAbsolutePath())
-          .contains(deviceId)) {
-        continue;
-      }
-      List<ChunkMetadata> chunkMetadataList = reader
-          .getChunkMetadataList(new Path(deviceId, measurementId));
-      if (chunkMetadataList == null) {
-        continue;
-      }
-      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-        Chunk chunk = reader.readMemChunk(chunkMetadata);
+    for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : 
readerChunkMetadataMap
+        .entrySet()) {
+      for (ChunkMetadata chunkMetadata : entry.getValue()) {
+        Chunk chunk = entry.getKey().readMemChunk(chunkMetadata);
         if (newChunkMetadata == null) {
           newChunkMetadata = chunkMetadata;
           newChunk = chunk;
@@ -93,20 +80,14 @@ public class HotCompactionUtils {
     return new Pair<>(newChunkMetadata, newChunk);
   }
 
-  private static long readUnseqChunk(String storageGroup,
-      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String 
deviceId, long maxVersion,
-      String measurementId,
-      Map<Long, TimeValuePair> timeValuePairMap, List<TsFileResource> 
levelResources)
+  private static long readUnseqChunk(
+      Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, 
long maxVersion,
+      Map<Long, TimeValuePair> timeValuePairMap)
       throws IOException {
-    for (TsFileResource levelResource : levelResources) {
-      TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
-          tsFileSequenceReaderMap,
-          storageGroup);
-      if (reader == null) {
-        continue;
-      }
-      List<ChunkMetadata> chunkMetadataList = reader
-          .getChunkMetadataList(new Path(deviceId, measurementId));
+    for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : 
readerChunkMetadataMap
+        .entrySet()) {
+      TsFileSequenceReader reader = entry.getKey();
+      List<ChunkMetadata> chunkMetadataList = entry.getValue();
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion);
         IChunkReader chunkReader = new ChunkReaderByTimestamp(
@@ -124,9 +105,7 @@ public class HotCompactionUtils {
     return maxVersion;
   }
 
-  private static void fillDeviceMeasurementMap(Set<String> devices,
-      Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap,
-      List<TsFileResource> subLevelResources,
+  private static void fillTsFileDevicesMap(List<TsFileResource> 
subLevelResources,
       Map<String, TsFileSequenceReader> tsFileSequenceReaderMap,
       Map<String, List<String>> tsFileDevicesMap, String storageGroup)
       throws IOException {
@@ -139,20 +118,6 @@ public class HotCompactionUtils {
       }
       tsFileDevicesMap
           .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), 
reader.getAllDevices());
-      List<Path> allPaths = reader.getAllPaths();
-      Map<String, TSDataType> allMeasurements = reader.getAllMeasurements();
-      // device, measurement -> chunk metadata list
-      for (Path path : allPaths) {
-        if (devices.contains(path.getDevice())) {
-          continue;
-        }
-        Map<String, MeasurementSchema> measurementSchemaMap = 
deviceMeasurementMap
-            .computeIfAbsent(path.getDevice(), k -> new HashMap<>());
-
-        // measurement, chunk metadata list
-        measurementSchemaMap.computeIfAbsent(path.getMeasurement(), k ->
-            new MeasurementSchema(k, 
allMeasurements.get(path.getMeasurement())));
-      }
     }
   }
 
@@ -166,47 +131,75 @@ public class HotCompactionUtils {
     Map<String, List<String>> tsFileDevicesMap = new HashMap<>();
     Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new 
HashMap<>();
     RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
-    fillDeviceMeasurementMap(devices, deviceMeasurementMap, tsFileResources,
-        tsFileSequenceReaderMap, tsFileDevicesMap, storageGroup);
-    if (!sequence) {
-      for (Entry<String, Map<String, MeasurementSchema>> 
deviceMeasurementEntry : deviceMeasurementMap
-          .entrySet()) {
-        String deviceId = deviceMeasurementEntry.getKey();
-        writer.startChunkGroup(deviceId);
-        long maxVersion = Long.MIN_VALUE;
-        for (Entry<String, MeasurementSchema> entry : 
deviceMeasurementEntry.getValue()
+    fillTsFileDevicesMap(tsFileResources, tsFileSequenceReaderMap, 
tsFileDevicesMap, storageGroup);
+    for (String device : devices) {
+      // sort chunkMeta by measurement
+      Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
measurementChunkMetadataMap = new HashMap<>();
+      writer.startChunkGroup(device);
+      for (TsFileResource levelResource : tsFileResources) {
+        TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
+            tsFileSequenceReaderMap, storageGroup);
+        Map<String, List<ChunkMetadata>> chunkMetadataMap = reader
+            .readChunkMetadataInDevice(device);
+        for (Entry<String, List<ChunkMetadata>> entry : 
chunkMetadataMap.entrySet()) {
+          for (ChunkMetadata chunkMetadata : entry.getValue()) {
+            Map<TsFileSequenceReader, List<ChunkMetadata>> 
readerChunkMetadataMap;
+            String measurementUid = chunkMetadata.getMeasurementUid();
+            if (measurementChunkMetadataMap.containsKey(measurementUid)) {
+              readerChunkMetadataMap = 
measurementChunkMetadataMap.get(measurementUid);
+            } else {
+              readerChunkMetadataMap = new HashMap<>();
+            }
+            List<ChunkMetadata> chunkMetadataList;
+            if (readerChunkMetadataMap.containsKey(reader)) {
+              chunkMetadataList = readerChunkMetadataMap.get(reader);
+            } else {
+              chunkMetadataList = new ArrayList<>();
+            }
+            chunkMetadataList.add(chunkMetadata);
+            readerChunkMetadataMap.put(reader, chunkMetadataList);
+            measurementChunkMetadataMap
+                .put(chunkMetadata.getMeasurementUid(), 
readerChunkMetadataMap);
+          }
+        }
+      }
+      if (!sequence) {
+        for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          String measurementId = entry.getKey();
+          long maxVersion = Long.MIN_VALUE;
           Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
-          maxVersion = readUnseqChunk(storageGroup, tsFileSequenceReaderMap, 
deviceId,
-              maxVersion, measurementId, timeValuePairMap, tsFileResources);
-          IChunkWriter chunkWriter = new ChunkWriterImpl(entry.getValue());
+          maxVersion = readUnseqChunk(entry.getValue(), maxVersion, 
timeValuePairMap);
+          Iterator<List<ChunkMetadata>> chunkMetadataListIterator = 
entry.getValue().values()
+              .iterator();
+          if (!chunkMetadataListIterator.hasNext()) {
+            continue;
+          }
+          List<ChunkMetadata> chunkMetadataList = 
chunkMetadataListIterator.next();
+          if (chunkMetadataList.size() <= 0) {
+            continue;
+          }
+          IChunkWriter chunkWriter = new ChunkWriterImpl(
+              new MeasurementSchema(entry.getKey(), 
chunkMetadataList.get(0).getDataType()));
           for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
             writeTVPair(timeValuePair, chunkWriter);
-            targetResource.updateStartTime(deviceId, 
timeValuePair.getTimestamp());
-            targetResource.updateEndTime(deviceId, 
timeValuePair.getTimestamp());
+            targetResource.updateStartTime(device, 
timeValuePair.getTimestamp());
+            targetResource.updateEndTime(device, timeValuePair.getTimestamp());
           }
           // wait for limit write
           MergeManager
               .mergeRateLimiterAcquire(compactionRateLimiter, 
chunkWriter.getCurrentChunkSize());
           chunkWriter.writeToFileWriter(writer);
+          writer.writeVersion(maxVersion);
+          writer.endChunkGroup();
+          if (hotCompactionLogger != null) {
+            hotCompactionLogger.logDevice(device, writer.getPos());
+          }
         }
-        writer.writeVersion(maxVersion);
-        writer.endChunkGroup();
-        if (hotCompactionLogger != null) {
-          hotCompactionLogger.logDevice(deviceId, writer.getPos());
-        }
-      }
-    } else {
-      for (Entry<String, Map<String, MeasurementSchema>> 
deviceMeasurementEntry : deviceMeasurementMap
-          .entrySet()) {
-        String deviceId = deviceMeasurementEntry.getKey();
-        writer.startChunkGroup(deviceId);
-        for (Entry<String, MeasurementSchema> entry : 
deviceMeasurementEntry.getValue()
+      } else {
+        for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          String measurementId = entry.getKey();
-          Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(storageGroup,
-              tsFileSequenceReaderMap, tsFileDevicesMap, deviceId, 
measurementId, tsFileResources);
+          Pair<ChunkMetadata, Chunk> chunkPair = readSeqChunk(
+              entry.getValue());
           ChunkMetadata newChunkMetadata = chunkPair.left;
           Chunk newChunk = chunkPair.right;
           if (newChunkMetadata != null && newChunk != null) {
@@ -214,13 +207,13 @@ public class HotCompactionUtils {
             MergeManager.mergeRateLimiterAcquire(compactionRateLimiter,
                 (long)newChunk.getHeader().getDataSize() + 
newChunk.getData().position());
             writer.writeChunk(newChunk, newChunkMetadata);
-            targetResource.updateStartTime(deviceId, 
newChunkMetadata.getStartTime());
-            targetResource.updateEndTime(deviceId, 
newChunkMetadata.getEndTime());
+            targetResource.updateStartTime(device, 
newChunkMetadata.getStartTime());
+            targetResource.updateEndTime(device, 
newChunkMetadata.getEndTime());
           }
         }
         writer.endChunkGroup();
         if (hotCompactionLogger != null) {
-          hotCompactionLogger.logDevice(deviceId, writer.getPos());
+          hotCompactionLogger.logDevice(device, writer.getPos());
         }
       }
     }

Reply via email to