This is an automated email from the ASF dual-hosted git repository.

bensonchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d87ecc1346 Refactor MultiTsFileDeviceIterator.MeasurementIterator 
(#12221)
3d87ecc1346 is described below

commit 3d87ecc1346a5b04c08dad3498fb1d14bd82bbe7
Author: shuwenwei <[email protected]>
AuthorDate: Fri Apr 26 15:00:46 2024 +0800

    Refactor MultiTsFileDeviceIterator.MeasurementIterator (#12221)
    
     refactor MultiTsFileDeviceIterator.MeasurementIterator
---
 .../impl/ReadChunkCompactionPerformer.java         |  12 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   | 146 ++++++++++++---------
 .../utils/MultiTsFileDeviceIteratorTest.java       |  70 ++++++++++
 3 files changed, 157 insertions(+), 71 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index b56d6a76235..512d6c12dfc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -175,15 +175,13 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, MetadataException, InterruptedException {
     writer.startChunkGroup(device);
-    MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
-        deviceIterator.iterateNotAlignedSeries(device, true);
+    
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator 
seriesIterator =
+        deviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
     while (seriesIterator.hasNextSeries()) {
       checkThreadInterrupted();
+      String series = seriesIterator.nextSeries();
       // TODO: we can provide a configuration item to enable concurrent 
between each series
-      PartialPath p = CompactionPathUtils.getPath(device, 
seriesIterator.nextSeries());
-      // TODO: seriesIterator needs to be refactor.
-      // This statement must be called before next hasNextSeries() called, or 
it may be trapped in a
-      // dead-loop.
+      PartialPath path = CompactionPathUtils.getPath(device, series);
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
       // remove the chunk metadata whose data type not match the data type of 
last chunk
@@ -191,7 +189,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
           filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
           new SingleSeriesCompactionExecutor(
-              p, readerAndChunkMetadataList, writer, targetResource, summary);
+              path, readerAndChunkMetadataList, writer, targetResource, 
summary);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
     writer.endChunkGroup();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 100207823d6..ee0065ff6ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -326,15 +326,16 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
   }
 
   /**
-   * return MeasurementIterator, who iterates the measurements of not aligned 
device
+   * return MultiTsFileNonAlignedMeasurementMetadataListIterator, who iterates 
the measurements of
+   * not aligned device
    *
-   * @param device the full path of the device to be iterated
+   * @param device the device to be iterated
    * @return measurement iterator of not aligned device
    * @throws IOException if io errors occurred
    */
-  public MeasurementIterator iterateNotAlignedSeries(
-      IDeviceID device, boolean derserializeTimeseriesMetadata) throws 
IOException {
-    return new MeasurementIterator(readerMap, device, 
derserializeTimeseriesMetadata);
+  public MultiTsFileNonAlignedMeasurementMetadataListIterator
+      iterateNotAlignedSeriesAndChunkMetadataList(IDeviceID device) throws 
IOException {
+    return new MultiTsFileNonAlignedMeasurementMetadataListIterator(readerMap, 
device);
   }
 
   /**
@@ -435,34 +436,30 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
   /*
   NonAligned measurement iterator.
    */
-  public class MeasurementIterator {
-    private Map<TsFileResource, TsFileSequenceReader> readerMap;
-    private IDeviceID device;
-    private String currentCompactingSeries = null;
-    private LinkedList<String> seriesInThisIteration = new LinkedList<>();
+  public class MultiTsFileNonAlignedMeasurementMetadataListIterator {
+    private final Map<TsFileResource, TsFileSequenceReader> readerMap;
+    private final IDeviceID device;
+    private final LinkedList<String> seriesInThisIteration = new 
LinkedList<>();
     // tsfile sequence reader -> series -> list<ChunkMetadata>
-    private Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> 
chunkMetadataCacheMap =
-        new HashMap<>();
+    private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+        chunkMetadataCacheMap = new HashMap<>();
     // this map cache the chunk metadata list iterator for each tsfile
     // the iterator return a batch of series and all chunk metadata of these 
series in this tsfile
-    private Map<TsFileResource, Iterator<Map<String, List<ChunkMetadata>>>>
+    private final Map<TsFileResource, Iterator<Map<String, 
List<ChunkMetadata>>>>
         chunkMetadataIteratorMap = new HashMap<>();
+    private String currentCompactingSeries = null;
+    private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+        chunkMetadataMetadataListOfCurrentCompactingSeries;
 
-    private MeasurementIterator(
-        Map<TsFileResource, TsFileSequenceReader> readerMap,
-        IDeviceID device,
-        boolean needDeserializeTimeseries)
-        throws IOException {
+    private MultiTsFileNonAlignedMeasurementMetadataListIterator(
+        Map<TsFileResource, TsFileSequenceReader> readerMap, IDeviceID device) 
throws IOException {
       this.readerMap = readerMap;
       this.device = device;
-
-      if (needDeserializeTimeseries) {
-        for (TsFileResource resource : tsFileResourcesSortedByAsc) {
-          TsFileSequenceReader reader = readerMap.get(resource);
-          chunkMetadataIteratorMap.put(
-              resource, 
reader.getMeasurementChunkMetadataListMapIterator(device));
-          chunkMetadataCacheMap.put(reader, new TreeMap<>());
-        }
+      for (TsFileResource resource : tsFileResourcesSortedByAsc) {
+        TsFileSequenceReader reader = readerMap.get(resource);
+        chunkMetadataIteratorMap.put(
+            resource, 
reader.getMeasurementChunkMetadataListMapIterator(device));
+        chunkMetadataCacheMap.put(reader, new TreeMap<>());
       }
     }
 
@@ -481,47 +478,61 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     @SuppressWarnings("squid:S3776")
     private boolean collectSeries() {
       String lastSeries = null;
-      List<String> tempCollectedSeries = new ArrayList<>();
+      List<String> collectedSeries = new ArrayList<>();
       for (TsFileResource resource : tsFileResourcesSortedByAsc) {
-        TsFileSequenceReader reader = readerMap.get(resource);
-        Map<String, List<ChunkMetadata>> chunkMetadataListMap = 
chunkMetadataCacheMap.get(reader);
-        if (chunkMetadataListMap.size() == 0) {
-          if (chunkMetadataIteratorMap.get(resource).hasNext()) {
-            chunkMetadataListMap = 
chunkMetadataIteratorMap.get(resource).next();
-            if (chunkMetadataListMap.containsKey("")) {
-              // encounter deleted aligned series, then remove it
-              chunkMetadataListMap.remove("");
-            }
-            chunkMetadataCacheMap.put(reader, chunkMetadataListMap);
-          } else {
-            continue;
-          }
+        Map<String, List<ChunkMetadata>> batchMeasurementChunkMetadataList =
+            getBatchMeasurementChunkMetadataListFromCache(resource);
+        if (batchMeasurementChunkMetadataList.isEmpty()) {
+          continue;
         }
+
         // get the min last series in the current chunk metadata
-        String maxSeries = Collections.max(chunkMetadataListMap.keySet());
+        String maxSeriesOfCurrentBatch =
+            Collections.max(batchMeasurementChunkMetadataList.keySet());
         if (lastSeries == null) {
-          lastSeries = maxSeries;
-        } else {
-          if (maxSeries.compareTo(lastSeries) < 0) {
-            lastSeries = maxSeries;
-          }
+          lastSeries = maxSeriesOfCurrentBatch;
+        } else if (maxSeriesOfCurrentBatch.compareTo(lastSeries) < 0) {
+          lastSeries = maxSeriesOfCurrentBatch;
         }
-        tempCollectedSeries.addAll(chunkMetadataListMap.keySet());
+        collectedSeries.addAll(batchMeasurementChunkMetadataList.keySet());
       }
-      if (!tempCollectedSeries.isEmpty()) {
-        if (!hasRemainingSeries()) {
-          lastSeries = Collections.max(tempCollectedSeries);
-        }
-        String finalLastSeries = lastSeries;
-        List<String> finalCollectedSeriesInThisIteration =
-            tempCollectedSeries.stream()
-                .filter(series -> series.compareTo(finalLastSeries) <= 0)
-                .collect(Collectors.toList());
-        seriesInThisIteration.addAll(finalCollectedSeriesInThisIteration);
-        return true;
-      } else {
+
+      if (collectedSeries.isEmpty()) {
         return false;
       }
+      if (!hasRemainingSeries()) {
+        lastSeries = Collections.max(collectedSeries);
+      }
+
+      String finalLastSeries = lastSeries;
+      seriesInThisIteration.addAll(
+          collectedSeries.stream()
+              .filter(series -> series.compareTo(finalLastSeries) <= 0)
+              .sorted()
+              .distinct()
+              .collect(Collectors.toList()));
+      return true;
+    }
+
+    private Map<String, List<ChunkMetadata>> 
getBatchMeasurementChunkMetadataListFromCache(
+        TsFileResource resource) {
+      TsFileSequenceReader reader = readerMap.get(resource);
+      Map<String, List<ChunkMetadata>> 
cachedBatchMeasurementChunkMetadataListMap =
+          chunkMetadataCacheMap.get(reader);
+      if (!cachedBatchMeasurementChunkMetadataListMap.isEmpty()) {
+        return cachedBatchMeasurementChunkMetadataListMap;
+      }
+      Iterator<Map<String, List<ChunkMetadata>>> 
batchMeasurementChunkMetadataListIterator =
+          chunkMetadataIteratorMap.get(resource);
+      if (!batchMeasurementChunkMetadataListIterator.hasNext()) {
+        return cachedBatchMeasurementChunkMetadataListMap;
+      }
+      Map<String, List<ChunkMetadata>> newBatchMeasurementChunkMetadataListMap 
=
+          batchMeasurementChunkMetadataListIterator.next();
+      // if encounter deleted aligned series, then remove it
+      newBatchMeasurementChunkMetadataListMap.remove("");
+      chunkMetadataCacheMap.put(reader, 
newBatchMeasurementChunkMetadataListMap);
+      return newBatchMeasurementChunkMetadataListMap;
     }
 
     private boolean hasRemainingSeries() {
@@ -537,15 +548,21 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
       return !seriesInThisIteration.isEmpty() || collectSeries();
     }
 
-    public String nextSeries() {
+    public String nextSeries() throws IllegalPathException {
       if (!hasNextSeries()) {
         return null;
       } else {
-        currentCompactingSeries = seriesInThisIteration.removeFirst();
+        chunkMetadataMetadataListOfCurrentCompactingSeries =
+            calculateMetadataListForCurrentSeries();
         return currentCompactingSeries;
       }
     }
 
+    public LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+        getMetadataListForCurrentSeries() {
+      return chunkMetadataMetadataListOfCurrentCompactingSeries;
+    }
+
     /**
      * Collect all the chunk metadata of current series from the source files.
      *
@@ -556,11 +573,12 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
      * @throws IllegalPathException if path is illegal
      */
     @SuppressWarnings("squid:S1319")
-    public LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
-        getMetadataListForCurrentSeries() throws IllegalPathException {
-      if (currentCompactingSeries == null) {
+    private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+        calculateMetadataListForCurrentSeries() throws IllegalPathException {
+      if (seriesInThisIteration.isEmpty()) {
         return new LinkedList<>();
       }
+      currentCompactingSeries = seriesInThisIteration.removeFirst();
 
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
           readerAndChunkMetadataForThisSeries = new LinkedList<>();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
index 16ef1be97c2..0e9c3cbb72d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -40,9 +40,14 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -54,6 +59,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -82,6 +88,70 @@ public class MultiTsFileDeviceIteratorTest extends 
AbstractCompactionTest {
     Thread.currentThread().setName(oldThreadName);
   }
 
+  @Test
+  public void testMeasurementIterator() throws IOException, MetadataException {
+    TsFileResource resource1 = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource1)) {
+      writer.startChunkGroup("d1");
+      for (int i = 1000; i < 2000; i++) {
+        writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+            "s" + i,
+            new TimeRange[] {new TimeRange(10, 20)},
+            TSEncoding.PLAIN,
+            CompressionType.LZ4);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+
+    seqResources.add(resource1);
+
+    TsFileResource resource2 = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource2)) {
+      writer.startChunkGroup("d1");
+      for (int i = 1000; i < 5000; i++) {
+        writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+            "s" + i,
+            new TimeRange[] {new TimeRange(30, 40)},
+            TSEncoding.PLAIN,
+            CompressionType.LZ4);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+
+    seqResources.add(resource2);
+
+    List<String> measurementSet = new ArrayList<>(4000);
+    try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+        new MultiTsFileDeviceIterator(seqResources)) {
+      while (multiTsFileDeviceIterator.hasNextDevice()) {
+        Pair<IDeviceID, Boolean> deviceIsAlignedPair = 
multiTsFileDeviceIterator.nextDevice();
+        IDeviceID device = deviceIsAlignedPair.getLeft();
+        
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
+            measurementIterator =
+                
multiTsFileDeviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+        while (measurementIterator.hasNextSeries()) {
+          String series = measurementIterator.nextSeries();
+          LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
+              measurementIterator.getMetadataListForCurrentSeries();
+          measurementSet.add(series);
+        }
+      }
+    }
+    new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new 
ReadChunkCompactionPerformer(), 0)
+        .start();
+    TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0);
+    try (TsFileSequenceReader reader =
+        new TsFileSequenceReader(targetFile.getTsFile().getAbsolutePath())) {
+      Assert.assertEquals(4000, reader.getAllMeasurements().size());
+    }
+    Assert.assertEquals(4000, measurementSet.size());
+  }
+
   @Test
   public void 
getNonAlignedDevicesFromDifferentFilesWithFourLayersInNodeTreeTest()
       throws MetadataException, IOException, WriteProcessException {

Reply via email to