This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a601a2463e866a0d984420d4c91ca95c23cb01d
Author: 张凌哲 <[email protected]>
AuthorDate: Fri Sep 25 01:13:31 2020 +0800

    fix bug
---
 .../tsfilemanagement/utils/HotCompactionUtils.java | 39 ++++++++++++++--------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
index d669aae..2df848f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -105,10 +106,10 @@ public class HotCompactionUtils {
     return maxVersion;
   }
 
-  private static void fillTsFileDevicesMap(List<TsFileResource> 
subLevelResources,
-      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap,
-      Map<String, List<String>> tsFileDevicesMap, String storageGroup)
+  private static Set<String> getTsFileDevicesSet(List<TsFileResource> 
subLevelResources,
+      Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String 
storageGroup)
       throws IOException {
+    Set<String> tsFileDevicesSet = new HashSet<>();
     for (TsFileResource levelResource : subLevelResources) {
       TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
           tsFileSequenceReaderMap,
@@ -116,11 +117,20 @@ public class HotCompactionUtils {
       if (reader == null) {
         continue;
       }
-      tsFileDevicesMap
-          .putIfAbsent(levelResource.getTsFile().getAbsolutePath(), 
reader.getAllDevices());
+      tsFileDevicesSet.addAll(reader.getAllDevices());
     }
+    return tsFileDevicesSet;
   }
 
+  /**
+   *
+   * @param targetResource the target resource to be merged to
+   * @param tsFileResources the source resource to be merged
+   * @param storageGroup the storage group name
+   * @param hotCompactionLogger the logger
+   * @param devices the devices to be skipped(used by recover)
+   * @throws IOException
+   */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public static void merge(TsFileResource targetResource,
       List<TsFileResource> tsFileResources, String storageGroup,
@@ -128,14 +138,16 @@ public class HotCompactionUtils {
       Set<String> devices, boolean sequence) throws IOException {
     RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new 
HashMap<>();
-    Map<String, List<String>> tsFileDevicesMap = new HashMap<>();
-    Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new 
HashMap<>();
     RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
-    fillTsFileDevicesMap(tsFileResources, tsFileSequenceReaderMap, 
tsFileDevicesMap, storageGroup);
-    for (String device : devices) {
+    Set<String> tsFileDevicesMap = getTsFileDevicesSet(tsFileResources, 
tsFileSequenceReaderMap,
+        storageGroup);
+    for (String device : tsFileDevicesMap) {
+      if (devices.contains(device)) {
+        continue;
+      }
+      writer.startChunkGroup(device);
       // sort chunkMeta by measurement
       Map<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
measurementChunkMetadataMap = new HashMap<>();
-      writer.startChunkGroup(device);
       for (TsFileResource levelResource : tsFileResources) {
         TsFileSequenceReader reader = 
buildReaderFromTsFileResource(levelResource,
             tsFileSequenceReaderMap, storageGroup);
@@ -148,7 +160,7 @@ public class HotCompactionUtils {
             if (measurementChunkMetadataMap.containsKey(measurementUid)) {
               readerChunkMetadataMap = 
measurementChunkMetadataMap.get(measurementUid);
             } else {
-              readerChunkMetadataMap = new HashMap<>();
+              readerChunkMetadataMap = new LinkedHashMap<>();
             }
             List<ChunkMetadata> chunkMetadataList;
             if (readerChunkMetadataMap.containsKey(reader)) {
@@ -164,9 +176,9 @@ public class HotCompactionUtils {
         }
       }
       if (!sequence) {
+        long maxVersion = Long.MIN_VALUE;
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          long maxVersion = Long.MIN_VALUE;
           Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
           maxVersion = readUnseqChunk(entry.getValue(), maxVersion, 
timeValuePairMap);
           Iterator<List<ChunkMetadata>> chunkMetadataListIterator = 
entry.getValue().values()
@@ -189,12 +201,11 @@ public class HotCompactionUtils {
           MergeManager
               .mergeRateLimiterAcquire(compactionRateLimiter, 
chunkWriter.getCurrentChunkSize());
           chunkWriter.writeToFileWriter(writer);
-          writer.writeVersion(maxVersion);
-          writer.endChunkGroup();
           if (hotCompactionLogger != null) {
             hotCompactionLogger.logDevice(device, writer.getPos());
           }
         }
+        writer.endChunkGroup();
       } else {
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {

Reply via email to