This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a2419ce1f1 Fix compaction metric and read device MetadataIndexNode
7a2419ce1f1 is described below

commit 7a2419ce1f1e77490ae8d80530cbbdd0df36cd2c
Author: shuwenwei <[email protected]>
AuthorDate: Fri May 31 21:29:46 2024 +0800

    Fix compaction metric and read device MetadataIndexNode
---
 .../impl/ReadChunkCompactionPerformer.java         |   2 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |  45 +++++---
 .../compaction/io/CompactionTsFileReader.java      | 119 +++----------------
 .../compaction/repair/RepairDataFileScanUtil.java  |  18 ++-
 .../inner/InnerSequenceCompactionSpeedTest.java    | 126 +++++++++++++++++++++
 .../utils/MultiTsFileDeviceIteratorTest.java       |   3 +-
 6 files changed, 189 insertions(+), 124 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index b13aca307f9..67b98f175f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -179,7 +179,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
       throws IOException, MetadataException, InterruptedException {
     writer.startChunkGroup(device);
     
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator 
seriesIterator =
-        deviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+        
deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
     while (seriesIterator.hasNextSeries()) {
       checkThreadInterrupted();
       String series = seriesIterator.nextSeries();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 40b156058c3..aa53aed93cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -38,6 +38,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.read.TsFileDeviceIterator;
@@ -334,13 +335,12 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
    * return MultiTsFileNonAlignedMeasurementMetadataListIterator, who iterates 
the measurements of
    * not aligned device
    *
-   * @param device the device to be iterated
    * @return measurement iterator of not aligned device
    * @throws IOException if io errors occurred
    */
   public MultiTsFileNonAlignedMeasurementMetadataListIterator
-      iterateNotAlignedSeriesAndChunkMetadataList(IDeviceID device) throws 
IOException {
-    return new MultiTsFileNonAlignedMeasurementMetadataListIterator(readerMap, 
device);
+      iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice() throws 
IOException {
+    return new MultiTsFileNonAlignedMeasurementMetadataListIterator();
   }
 
   /**
@@ -372,9 +372,12 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
       if (!currentDevice.equals(iterator.current())) {
         continue;
       }
+      MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
+          iterator.getFirstMeasurementNodeOfCurrentDevice();
       TsFileSequenceReader reader = readerMap.get(tsFileResource);
       List<AlignedChunkMetadata> alignedChunkMetadataList =
-          reader.getAlignedChunkMetadata(currentDevice.left);
+          reader.getAlignedChunkMetadataByMetadataIndexNode(
+              currentDevice.left, firstMeasurementNodeOfCurrentDevice);
       applyModificationForAlignedChunkMetadataList(tsFileResource, 
alignedChunkMetadataList);
       readerAndChunkMetadataList.add(new Pair<>(reader, 
alignedChunkMetadataList));
     }
@@ -465,8 +468,6 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
   NonAligned measurement iterator.
    */
   public class MultiTsFileNonAlignedMeasurementMetadataListIterator {
-    private final Map<TsFileResource, TsFileSequenceReader> readerMap;
-    private final IDeviceID device;
     private final LinkedList<String> seriesInThisIteration = new 
LinkedList<>();
     // tsfile sequence reader -> series -> list<ChunkMetadata>
     private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
@@ -479,14 +480,31 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
         chunkMetadataMetadataListOfCurrentCompactingSeries;
 
-    private MultiTsFileNonAlignedMeasurementMetadataListIterator(
-        Map<TsFileResource, TsFileSequenceReader> readerMap, IDeviceID device) 
throws IOException {
-      this.readerMap = readerMap;
-      this.device = device;
+    private MultiTsFileNonAlignedMeasurementMetadataListIterator() throws 
IOException {
+      IDeviceID device = currentDevice.getLeft();
       for (TsFileResource resource : tsFileResourcesSortedByAsc) {
+        TsFileDeviceIterator deviceIterator = deviceIteratorMap.get(resource);
         TsFileSequenceReader reader = readerMap.get(resource);
-        chunkMetadataIteratorMap.put(
-            resource, 
reader.getMeasurementChunkMetadataListMapIterator(device));
+        if (deviceIterator == null || 
!device.equals(deviceIterator.current().getLeft())) {
+          chunkMetadataIteratorMap.put(
+              resource,
+              new Iterator<Map<String, List<ChunkMetadata>>>() {
+                @Override
+                public boolean hasNext() {
+                  return false;
+                }
+
+                @Override
+                public Map<String, List<ChunkMetadata>> next() {
+                  return Collections.emptyMap();
+                }
+              });
+        } else {
+          chunkMetadataIteratorMap.put(
+              resource,
+              reader.getMeasurementChunkMetadataListMapIterator(
+                  deviceIterator.getFirstMeasurementNodeOfCurrentDevice()));
+        }
         chunkMetadataCacheMap.put(reader, new TreeMap<>());
       }
     }
@@ -610,7 +628,8 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
 
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
           readerAndChunkMetadataForThisSeries = new LinkedList<>();
-      PartialPath path = CompactionPathUtils.getPath(device, 
currentCompactingSeries);
+      PartialPath path =
+          CompactionPathUtils.getPath(currentDevice.getLeft(), 
currentCompactingSeries);
 
       for (TsFileResource resource : tsFileResourcesSortedByAsc) {
         TsFileSequenceReader reader = readerMap.get(resource);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 09afe1dad01..b836352d64f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -27,12 +27,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant
 import org.apache.tsfile.file.IMetadataIndexEntry;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
-import org.apache.tsfile.file.metadata.IChunkMetadata;
-import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
-import org.apache.tsfile.read.TsFileDeviceIterator;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.utils.Pair;
@@ -43,9 +40,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This class extends the TsFileSequenceReader class to read and manage TsFile 
with a focus on
@@ -53,8 +48,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * data read and distinguishing between aligned and not aligned series during 
compaction.
  */
 public class CompactionTsFileReader extends TsFileSequenceReader {
-  /** Tracks the total amount of data (in bytes) that has been read. */
-  private AtomicLong readDataSize = new AtomicLong(0L);
+
+  private long metadataOffset = 0;
 
   /** The type of compaction running. */
   CompactionType compactionType;
@@ -73,13 +68,25 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
     super(file);
     this.tsFileInput = new CompactionTsFileInput(tsFileInput);
     this.compactionType = compactionType;
+    this.metadataOffset = readFileMetadata().getMetaOffset();
   }
 
   @Override
   protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
     acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
     ByteBuffer buffer = super.readData(position, totalSize);
-    readDataSize.addAndGet(totalSize);
+    if (position >= metadataOffset) {
+      CompactionMetrics.getInstance()
+          .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
totalSize);
+    } else {
+      CompactionMetrics.getInstance()
+          .recordReadInfo(
+              compactionType,
+              readingAlignedSeries
+                  ? CompactionIoDataType.ALIGNED
+                  : CompactionIoDataType.NOT_ALIGNED,
+              totalSize);
+    }
     return buffer;
   }
 
@@ -95,20 +102,7 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
 
   @Override
   public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
-    synchronized (this) {
-      // using synchronized to avoid concurrent read that makes readDataSize 
not correct
-      long before = readDataSize.get();
-      Chunk chunk = super.readMemChunk(metaData);
-      long dataSize = readDataSize.get() - before;
-      CompactionMetrics.getInstance()
-          .recordReadInfo(
-              compactionType,
-              readingAlignedSeries
-                  ? CompactionIoDataType.ALIGNED
-                  : CompactionIoDataType.NOT_ALIGNED,
-              dataSize);
-      return chunk;
-    }
+    return super.readMemChunk(metaData);
   }
 
   public ChunkHeader readChunkHeader(long position) throws IOException {
@@ -127,74 +121,12 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
     return readData(startOffset, pageSize);
   }
 
-  @Override
-  public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws 
IOException {
-    long before = readDataSize.get();
-    TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned();
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-    return iterator;
-  }
-
-  @Override
-  public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
-      long startOffset, long endOffset) throws IOException {
-    long before = readDataSize.get();
-    List<IChunkMetadata> chunkMetadataList =
-        super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset, 
endOffset);
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-    return chunkMetadataList;
-  }
-
-  @Override
-  public void getDevicesAndEntriesOfOneLeafNode(
-      Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>> 
measurementNodeOffsetQueue)
-      throws IOException {
-    long before = readDataSize.get();
-    super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset, 
measurementNodeOffsetQueue);
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-  }
-
-  @Override
-  public MetadataIndexNode readMetadataIndexNode(long start, long end, boolean 
isDeviceLevel)
-      throws IOException {
-    long before = readDataSize.get();
-    MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start, 
end, isDeviceLevel);
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-    return metadataIndexNode;
-  }
-
-  @Override
-  public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
-      getTimeseriesMetadataOffsetByDevice(
-          MetadataIndexNode measurementNode,
-          Set<String> excludedMeasurementIds,
-          boolean needChunkMetadata)
-          throws IOException {
-    long before = readDataSize.get();
-    Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> result =
-        super.getTimeseriesMetadataOffsetByDevice(
-            measurementNode, excludedMeasurementIds, needChunkMetadata);
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-    return result;
-  }
-
   public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
       getTimeseriesMetadataAndOffsetByDevice(
           MetadataIndexNode measurementNode,
           Set<String> excludedMeasurementIds,
           boolean needChunkMetadata)
           throws IOException {
-    long before = readDataSize.get();
     Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap =
         new LinkedHashMap<>();
     List<IMetadataIndexEntry> childrenEntryList = 
measurementNode.getChildren();
@@ -229,28 +161,9 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
                 nextLayerMeasurementNode, excludedMeasurementIds, 
needChunkMetadata));
       }
     }
-
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
     return timeseriesMetadataOffsetMap;
   }
 
-  @Override
-  public void getDeviceTimeseriesMetadata(
-      List<TimeseriesMetadata> timeseriesMetadataList,
-      MetadataIndexNode measurementNode,
-      Set<String> excludedMeasurementIds,
-      boolean needChunkMetadata)
-      throws IOException {
-    long before = readDataSize.get();
-    super.getDeviceTimeseriesMetadata(
-        timeseriesMetadataList, measurementNode, excludedMeasurementIds, 
needChunkMetadata);
-    long dataSize = readDataSize.get() - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
-  }
-
   private void acquireReadDataSizeWithCompactionReadRateLimiter(int 
readDataSize) {
     
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
     
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index 3af6fad71b9..1de636c2109 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -39,6 +39,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -85,11 +86,13 @@ public class RepairDataFileScanUtil {
       while (deviceIterator.hasNext()) {
         Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
         IDeviceID device = deviceIsAlignedPair.getLeft();
+        MetadataIndexNode metadataIndexNode =
+            deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
         boolean isAligned = deviceIsAlignedPair.getRight();
         if (isAligned) {
-          checkAlignedDeviceSeries(reader, device);
+          checkAlignedDeviceSeries(reader, device, metadataIndexNode);
         } else {
-          checkNonAlignedDeviceSeries(reader, device);
+          checkNonAlignedDeviceSeries(reader, device, metadataIndexNode);
         }
       }
     } catch (CompactionLastTimeCheckFailedException 
lastTimeCheckFailedException) {
@@ -108,9 +111,11 @@ public class RepairDataFileScanUtil {
     }
   }
 
-  private void checkAlignedDeviceSeries(TsFileSequenceReader reader, IDeviceID 
device)
+  private void checkAlignedDeviceSeries(
+      TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode 
metadataIndexNode)
       throws IOException {
-    List<AlignedChunkMetadata> chunkMetadataList = 
reader.getAlignedChunkMetadata(device);
+    List<AlignedChunkMetadata> chunkMetadataList =
+        reader.getAlignedChunkMetadataByMetadataIndexNode(device, 
metadataIndexNode);
     for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
       IChunkMetadata timeChunkMetadata = 
alignedChunkMetadata.getTimeChunkMetadata();
       Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata);
@@ -141,10 +146,11 @@ public class RepairDataFileScanUtil {
     previousTime = Long.MIN_VALUE;
   }
 
-  private void checkNonAlignedDeviceSeries(TsFileSequenceReader reader, 
IDeviceID device)
+  private void checkNonAlignedDeviceSeries(
+      TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode 
metadataIndexNode)
       throws IOException {
     Iterator<Map<String, List<ChunkMetadata>>> 
measurementChunkMetadataListMapIterator =
-        reader.getMeasurementChunkMetadataListMapIterator(device);
+        reader.getMeasurementChunkMetadataListMapIterator(metadataIndexNode);
     while (measurementChunkMetadataListMapIterator.hasNext()) {
       Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap =
           measurementChunkMetadataListMapIterator.next();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
new file mode 100644
index 00000000000..097d4e6a6db
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class InnerSequenceCompactionSpeedTest extends AbstractCompactionTest {
+  private final long compactionReadThroughputPerSec =
+      
IoTDBDescriptor.getInstance().getConfig().getCompactionReadThroughputMbPerSec();
+
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    super.setUp();
+    CompactionTaskManager.getInstance().setCompactionReadThroughputRate(1);
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    CompactionTaskManager.getInstance()
+        .setCompactionReadThroughputRate(compactionReadThroughputPerSec);
+  }
+
+  @Test
+  public void testManyAlignedDeviceTsFile() throws IOException, 
InterruptedException {
+    List<String> deviceNames = new ArrayList<>();
+    for (int i = 0; i < 100000; i++) {
+      deviceNames.add("d" + i);
+    }
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      for (String device : deviceNames) {
+        writer.startChunkGroup(device);
+        writer.generateSimpleAlignedSeriesToCurrentDevice(
+            Collections.singletonList("s0"),
+            new TimeRange[] {new TimeRange(1, 2)},
+            TSEncoding.PLAIN,
+            CompressionType.LZ4);
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    seqResources.add(resource);
+    tsFileManager.add(resource, true);
+    long tsFileSize = resource.getTsFileSize();
+    Thread thread =
+        new Thread(
+            () -> {
+              InnerSpaceCompactionTask task =
+                  new InnerSpaceCompactionTask(
+                      tsFileManager, 0, seqResources, true, new 
ReadChunkCompactionPerformer(), 0);
+              task.start();
+            });
+    thread.start();
+    thread.join(TimeUnit.SECONDS.toMillis(30 + tsFileSize / IoTDBConstant.MB));
+  }
+
+  @Test
+  public void testManyNotAlignedDeviceTsFile() throws IOException {
+    List<String> deviceNames = new ArrayList<>();
+    for (int i = 0; i < 100000; i++) {
+      deviceNames.add("d" + i);
+    }
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      for (String device : deviceNames) {
+        writer.startChunkGroup(device);
+        writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+            "s0", new TimeRange[] {new TimeRange(1, 2)}, TSEncoding.PLAIN, 
CompressionType.LZ4);
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    seqResources.add(resource);
+    tsFileManager.add(resource, true);
+    long tsFileSize = resource.getTsFileSize();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            tsFileManager, 0, seqResources, true, new 
ReadChunkCompactionPerformer(), 0);
+    Assert.assertTrue(task.start());
+    Assert.assertTrue(
+        TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) > 
task.getTimeCost());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
index c9b9b62f200..863efce14cf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -133,7 +133,8 @@ public class MultiTsFileDeviceIteratorTest extends 
AbstractCompactionTest {
         IDeviceID device = deviceIsAlignedPair.getLeft();
         
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
             measurementIterator =
-                
multiTsFileDeviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+                multiTsFileDeviceIterator
+                    
.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
         while (measurementIterator.hasNextSeries()) {
           String series = measurementIterator.nextSeries();
           LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =

Reply via email to