This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryIORecord
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 88c9ebd47433d301e16e89eaa90a8b48f77058b2
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Nov 12 15:54:20 2024 +0800

    Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan
---
 .../org/apache/tsfile/file/header/ChunkHeader.java |  21 +++
 .../apache/tsfile/read/TsFileSequenceReader.java   | 173 +++++++++++++++++----
 2 files changed, 166 insertions(+), 28 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
index 6cb3bee5..c07071fb 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.LongConsumer;
 
 public class ChunkHeader {
 
@@ -189,7 +190,23 @@ public class ChunkHeader {
    * @throws IOException IOException
    */
   public static ChunkHeader deserializeFrom(TsFileInput input, long offset) 
throws IOException {
+    return deserializeFrom(input, offset, null);
+  }
 
+  /**
+   * deserialize from TsFileInput, the marker has not been read.
+   *
+   * @param input TsFileInput
+   * @param offset offset
+   * @param ioSizeRecorder can be null
+   * @return CHUNK_HEADER object
+   * @throws IOException IOException
+   */
+  public static ChunkHeader deserializeFrom(
+      TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws 
IOException {
+
+    // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into 
the remaining raed
+    // operation
     ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1);
     input.read(buffer, offset);
     buffer.flip();
@@ -208,6 +225,10 @@ public class ChunkHeader {
             + TSEncoding.getSerializedSize();
     buffer = ByteBuffer.allocate(remainingBytes);
 
+    if (ioSizeRecorder != null) {
+      ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes);
+    }
+
     input.read(buffer, offset + alreadyReadLength);
     buffer.flip();
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 1374a6d4..7e78187c 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -103,6 +103,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
 
 public class TsFileSequenceReader implements AutoCloseable {
@@ -340,13 +341,20 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @throws IOException io error
    */
   public TsFileMetadata readFileMetadata() throws IOException {
+    return readFileMetadata(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws 
IOException {
     try {
       if (tsFileMetaData == null) {
         synchronized (this) {
           if (tsFileMetaData == null) {
             tsFileMetaData =
                 deserializeConfig.tsFileMetadataBufferDeserializer.deserialize(
-                    readData(fileMetadataPos, fileMetadataSize), 
deserializeConfig);
+                    readData(fileMetadataPos, fileMetadataSize, 
ioSizeRecorder), deserializeConfig);
           }
         }
       }
@@ -365,7 +373,14 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @throws IOException io error
    */
   public BloomFilter readBloomFilter() throws IOException {
-    readFileMetadata();
+    return readBloomFilter(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws 
IOException {
+    readFileMetadata(ioSizeRecorder);
     return tsFileMetaData.getBloomFilter();
   }
 
@@ -378,8 +393,15 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @throws IOException if an I/O error occurs while reading the file metadata
    */
   public EncryptParameter getEncryptParam() throws IOException {
+    return getEncryptParam(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public EncryptParameter getEncryptParam(LongConsumer ioSizeRecorder) throws 
IOException {
     if (fileMetadataSize != 0) {
-      readFileMetadata();
+      readFileMetadata(ioSizeRecorder);
       return tsFileMetaData.getEncryptParam();
     }
     return EncryptUtils.encryptParam;
@@ -461,7 +483,7 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw e;
       }
       metadataIndexPair =
-          getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
measurement, false);
+          getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
measurement, false, null);
     }
     if (metadataIndexPair == null) {
       return null;
@@ -526,7 +548,8 @@ public class TsFileSequenceReader implements AutoCloseable {
     }
     firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode);
     metadataIndexPair =
-        getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
path.getMeasurement(), false);
+        getMetadataAndEndOffsetOfMeasurementNode(
+            metadataIndexNode, path.getMeasurement(), false, null);
 
     if (metadataIndexPair == null) {
       return null;
@@ -561,15 +584,25 @@ public class TsFileSequenceReader implements 
AutoCloseable {
   /* Find the leaf node that contains path, return all the sensors in that 
leaf node which are also in allSensors set */
   public List<TimeseriesMetadata> readTimeseriesMetadata(
       IDeviceID device, String measurement, Set<String> allSensors) throws 
IOException {
+    return readTimeseriesMetadata(device, measurement, allSensors, null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public List<TimeseriesMetadata> readTimeseriesMetadata(
+      IDeviceID device, String measurement, Set<String> allSensors, 
LongConsumer ioSizeRecorder)
+      throws IOException {
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
-        getLeafMetadataIndexPair(device, measurement);
+        getLeafMetadataIndexPair(device, measurement, ioSizeRecorder);
     if (metadataIndexPair == null) {
       return Collections.emptyList();
     }
     List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
 
     if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < 
Integer.MAX_VALUE) {
-      ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+      ByteBuffer buffer =
+          readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right, ioSizeRecorder);
       while (buffer.hasRemaining()) {
         TimeseriesMetadata timeseriesMetadata;
         try {
@@ -587,6 +620,9 @@ public class TsFileSequenceReader implements AutoCloseable {
       // when the buffer length is over than Integer.MAX_VALUE,
       // using tsFileInput to get timeseriesMetadataList
       synchronized (this) {
+        if (ioSizeRecorder != null) {
+          ioSizeRecorder.accept(metadataIndexPair.right - 
metadataIndexPair.left.getOffset());
+        }
         tsFileInput.position(metadataIndexPair.left.getOffset());
         while (tsFileInput.position() < metadataIndexPair.right) {
           TimeseriesMetadata timeseriesMetadata;
@@ -610,16 +646,17 @@ public class TsFileSequenceReader implements 
AutoCloseable {
 
   /* Get leaf MetadataIndexPair which contains path */
   private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
-      IDeviceID device, String measurement) throws IOException {
-    readFileMetadata();
+      IDeviceID device, String measurement, LongConsumer ioSizeRecorder) 
throws IOException {
+    readFileMetadata(ioSizeRecorder);
     MetadataIndexNode deviceMetadataIndexNode =
         tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
-        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true);
+        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true, ioSizeRecorder);
     if (metadataIndexPair == null) {
       return null;
     }
-    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+    ByteBuffer buffer =
+        readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, 
ioSizeRecorder);
     MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
     if 
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
 {
       try {
@@ -631,7 +668,8 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw e;
       }
       metadataIndexPair =
-          getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
measurement, false);
+          getMetadataAndEndOffsetOfMeasurementNode(
+              metadataIndexNode, measurement, false, ioSizeRecorder);
     }
     return metadataIndexPair;
   }
@@ -700,7 +738,7 @@ public class TsFileSequenceReader implements AutoCloseable {
       List<TimeseriesMetadata> timeseriesMetadataList, MetadataIndexNode node, 
String measurement)
       throws IOException {
     Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair =
-        getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false);
+        getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false, 
null);
 
     if (measurementMetadataIndexPair == null) {
       return false;
@@ -1400,6 +1438,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    */
   protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfDeviceNode(
       MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean 
exactSearch) throws IOException {
+    return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID, 
exactSearch, null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfDeviceNode(
+      MetadataIndexNode metadataIndex,
+      IDeviceID deviceID,
+      boolean exactSearch,
+      LongConsumer ioSizeRecorder)
+      throws IOException {
     if (metadataIndex == null) {
       return null;
     }
@@ -1411,12 +1461,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       if 
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) {
         Pair<IMetadataIndexEntry, Long> childIndexEntry =
             metadataIndex.getChildIndexEntry(deviceID, false);
-        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), 
childIndexEntry.right);
+        ByteBuffer buffer =
+            readData(childIndexEntry.left.getOffset(), childIndexEntry.right, 
ioSizeRecorder);
         return getMetadataAndEndOffsetOfDeviceNode(
             
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
                 buffer, deserializeConfig),
             deviceID,
-            exactSearch);
+            exactSearch,
+            ioSizeRecorder);
       } else {
         return metadataIndex.getChildIndexEntry(deviceID, exactSearch);
       }
@@ -1435,10 +1487,15 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @param measurement target measurement
    * @param exactSearch whether is in exact search mode, return null when 
there is no entry with
    *     name; or else return the nearest MetadataIndexEntry before it (for 
deeper search)
+   * @param ioSizeRecorder can be null
    * @return target MetadataIndexEntry, endOffset pair
    */
   protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfMeasurementNode(
-      MetadataIndexNode metadataIndex, String measurement, boolean 
exactSearch) throws IOException {
+      MetadataIndexNode metadataIndex,
+      String measurement,
+      boolean exactSearch,
+      LongConsumer ioSizeRecorder)
+      throws IOException {
     if 
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())
         || 
MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) {
       throw new IllegalArgumentException();
@@ -1447,12 +1504,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       if 
(MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType()))
 {
         Pair<IMetadataIndexEntry, Long> childIndexEntry =
             metadataIndex.getChildIndexEntry(measurement, false);
-        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), 
childIndexEntry.right);
+        ByteBuffer buffer =
+            readData(childIndexEntry.left.getOffset(), childIndexEntry.right, 
ioSizeRecorder);
         return getMetadataAndEndOffsetOfMeasurementNode(
             
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
                 buffer, deserializeConfig),
             measurement,
-            exactSearch);
+            exactSearch,
+            ioSizeRecorder);
       } else {
         return metadataIndex.getChildIndexEntry(measurement, exactSearch);
       }
@@ -1527,10 +1586,12 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * read the chunk's header.
    *
    * @param position the file offset of this chunk's header
+   * @param ioSizeRecorder can be null
    */
-  private ChunkHeader readChunkHeader(long position) throws IOException {
+  private ChunkHeader readChunkHeader(long position, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return ChunkHeader.deserializeFrom(tsFileInput, position);
+      return ChunkHeader.deserializeFrom(tsFileInput, position, 
ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1547,8 +1608,21 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return the pages of this chunk
    */
   public ByteBuffer readChunk(long position, int dataSize) throws IOException {
+    return readChunk(position, dataSize, null);
+  }
+
+  /**
+   * notice, this function will modify channel's position.
+   *
+   * @param dataSize the size of chunkdata
+   * @param position the offset of the chunk data
+   * @param ioSizeRecorder can be null
+   * @return the pages of this chunk
+   */
+  public ByteBuffer readChunk(long position, int dataSize, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return readData(position, dataSize);
+      return readData(position, dataSize, ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1563,10 +1637,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return -chunk
    */
   public Chunk readMemChunk(long offset) throws IOException {
+    return readMemChunk(offset, null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws 
IOException {
     try {
-      ChunkHeader header = readChunkHeader(offset);
-      ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), 
header.getDataSize());
-      return new Chunk(header, buffer, getEncryptParam());
+      ChunkHeader header = readChunkHeader(offset, ioSizeRecorder);
+      ByteBuffer buffer =
+          readChunk(offset + header.getSerializedSize(), header.getDataSize(), 
ioSizeRecorder);
+      return new Chunk(header, buffer, getEncryptParam(ioSizeRecorder));
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1583,7 +1665,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    */
   public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
     try {
-      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader());
+      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), 
null);
       ByteBuffer buffer =
           readChunk(
               metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), 
header.getDataSize());
@@ -1608,7 +1690,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return chunk
    */
   public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) 
throws IOException {
-    ChunkHeader header = 
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader());
+    ChunkHeader header = 
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null);
     ByteBuffer buffer =
         readChunk(
             chunkCacheKey.getOffsetOfChunkHeader() + 
header.getSerializedSize(),
@@ -1648,7 +1730,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       return null;
     }
     IChunkMetadata lastChunkMetadata = 
chunkMetadataList.get(chunkMetadataList.size() - 1);
-    ChunkHeader header = 
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader());
+    ChunkHeader header = 
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null);
     return new MeasurementSchema(
         lastChunkMetadata.getMeasurementUid(),
         header.getDataType(),
@@ -1776,6 +1858,26 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return data that been read.
    */
   protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+    return readData(position, totalSize, null);
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), 
or the given
+   * position. <br>
+   * if position = -1, the tsFileInput's position will be changed to the 
current position + real
+   * data size that been read. Other wise, the tsFileInput's position is not 
changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the 
current position if
+   *     position = -1
+   * @param totalSize the size of data that want to read
+   * @param ioSizeRecorder can be null
+   * @return data that been read.
+   */
+  protected ByteBuffer readData(long position, int totalSize, LongConsumer 
ioSizeRecorder)
+      throws IOException {
+    if (ioSizeRecorder != null) {
+      ioSizeRecorder.accept(totalSize);
+    }
     int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize);
     int allocateNum = (int) Math.ceil((double) totalSize / allocateSize);
     ByteBuffer buffer = ByteBuffer.allocate(totalSize);
@@ -1817,8 +1919,23 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return data that been read.
    */
   protected ByteBuffer readData(long start, long end) throws IOException {
+    return readData(start, end, null);
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), 
or the given
+   * position.
+   *
+   * @param start the start position of data in the tsFileInput, or the 
current position if position
+   *     = -1
+   * @param end the end position of data that want to read
+   * @param ioSizeRecorder can be null
+   * @return data that been read.
+   */
+  protected ByteBuffer readData(long start, long end, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return readData(start, (int) (end - start));
+      return readData(start, (int) (end - start), ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {

Reply via email to