This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch iotdb
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/iotdb by this push:
     new af5cc2e0 Add LongConsumer ioSizeRecorder in TsFileSequenceReader for 
IoTDB scan
af5cc2e0 is described below

commit af5cc2e0e4e8a6b81cec172f4155890e201cc8a8
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Nov 13 09:28:12 2024 +0800

    Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan
---
 .../org/apache/tsfile/file/header/ChunkHeader.java |  21 ++
 .../apache/tsfile/read/TsFileSequenceReader.java   | 269 +++++++++++++++++----
 .../apache/tsfile/read/UnClosedTsFileReader.java   |  12 +-
 .../tsfile/read/TimeSeriesMetadataReadTest.java    |   4 +-
 .../tsfile/read/UnClosedTsFileReaderTest.java      |   2 +-
 5 files changed, 256 insertions(+), 52 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
index 6cb3bee5..c07071fb 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.LongConsumer;
 
 public class ChunkHeader {
 
@@ -189,7 +190,23 @@ public class ChunkHeader {
    * @throws IOException IOException
    */
   public static ChunkHeader deserializeFrom(TsFileInput input, long offset) 
throws IOException {
+    return deserializeFrom(input, offset, null);
+  }
 
+  /**
+   * deserialize from TsFileInput, the marker has not been read.
+   *
+   * @param input TsFileInput
+   * @param offset offset
+   * @param ioSizeRecorder can be null
+   * @return CHUNK_HEADER object
+   * @throws IOException IOException
+   */
+  public static ChunkHeader deserializeFrom(
+      TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws 
IOException {
+
+    // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into 
the remaining raed
+    // operation
     ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1);
     input.read(buffer, offset);
     buffer.flip();
@@ -208,6 +225,10 @@ public class ChunkHeader {
             + TSEncoding.getSerializedSize();
     buffer = ByteBuffer.allocate(remainingBytes);
 
+    if (ioSizeRecorder != null) {
+      ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes);
+    }
+
     input.read(buffer, offset + alreadyReadLength);
     buffer.flip();
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index d55daf26..06ec5466 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -103,6 +103,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
 
 public class TsFileSequenceReader implements AutoCloseable {
@@ -144,7 +145,21 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @throws IOException If some I/O error occurs
    */
   public TsFileSequenceReader(String file) throws IOException {
-    this(file, true);
+    this(file, null);
+  }
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of 
the file to get the
+   * file metadata size.Then the reader will skip the first
+   * TSFileConfig.MAGIC_STRING.getBytes().length + 
TSFileConfig.NUMBER_VERSION.getBytes().length
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @param ioSizeRecorder can be null
+   * @throws IOException If some I/O error occurs
+   */
+  public TsFileSequenceReader(String file, LongConsumer ioSizeRecorder) throws 
IOException {
+    this(file, true, ioSizeRecorder);
   }
 
   /**
@@ -154,6 +169,18 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @param loadMetadataSize -whether load meta data size
    */
   public TsFileSequenceReader(String file, boolean loadMetadataSize) throws 
IOException {
+    this(file, loadMetadataSize, null);
+  }
+
+  /**
+   * construct function for TsFileSequenceReader.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -whether load meta data size
+   * @param ioSizeRecorder can be null
+   */
+  public TsFileSequenceReader(String file, boolean loadMetadataSize, 
LongConsumer ioSizeRecorder)
+      throws IOException {
     if (resourceLogger.isDebugEnabled()) {
       resourceLogger.debug("{} reader is opened. {}", file, 
getClass().getName());
     }
@@ -161,9 +188,9 @@ public class TsFileSequenceReader implements AutoCloseable {
     tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
 
     try {
-      loadFileVersion();
+      loadFileVersion(ioSizeRecorder);
       if (loadMetadataSize) {
-        loadMetadataSize();
+        loadMetadataSize(ioSizeRecorder);
       }
     } catch (Throwable e) {
       tsFileInput.close();
@@ -224,10 +251,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     this.fileMetadataSize = fileMetadataSize;
   }
 
-  private void loadFileVersion() throws IOException {
+  // ioSizeRecorder can be null
+  private void loadFileVersion(LongConsumer ioSizeRecorder) throws IOException 
{
     try {
       
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
-      final ByteBuffer buffer = ByteBuffer.allocate(1);
+      final ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES);
+      if (ioSizeRecorder != null) {
+        ioSizeRecorder.accept(Byte.BYTES);
+      }
       tsFileInput.read(buffer);
       buffer.flip();
       fileVersion = buffer.get();
@@ -255,8 +286,19 @@ public class TsFileSequenceReader implements AutoCloseable 
{
   }
 
   public void loadMetadataSize() throws IOException {
-    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    loadMetadataSize(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public void loadMetadataSize(LongConsumer ioSizeRecorder) throws IOException 
{
+    int readSize = Integer.BYTES;
+    ByteBuffer metadataSize = ByteBuffer.allocate(readSize);
     if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+      if (ioSizeRecorder != null) {
+        ioSizeRecorder.accept(readSize);
+      }
       tsFileInput.read(
           metadataSize,
           tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - 
Integer.BYTES);
@@ -329,7 +371,8 @@ public class TsFileSequenceReader implements AutoCloseable {
   /** this function reads version number and checks compatibility of TsFile. */
   public byte readVersionNumber() throws IOException {
     ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES);
-    tsFileInput.read(versionNumberByte, 
TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(
+        versionNumberByte, 
TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
     versionNumberByte.flip();
     return versionNumberByte.get();
   }
@@ -340,13 +383,20 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @throws IOException io error
    */
   public TsFileMetadata readFileMetadata() throws IOException {
+    return readFileMetadata(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws 
IOException {
     try {
       if (tsFileMetaData == null) {
         synchronized (this) {
           if (tsFileMetaData == null) {
             tsFileMetaData =
                 deserializeConfig.tsFileMetadataBufferDeserializer.deserialize(
-                    readData(fileMetadataPos, fileMetadataSize), 
deserializeConfig);
+                    readData(fileMetadataPos, fileMetadataSize, 
ioSizeRecorder), deserializeConfig);
           }
         }
       }
@@ -365,7 +415,14 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @throws IOException io error
    */
   public BloomFilter readBloomFilter() throws IOException {
-    readFileMetadata();
+    return readBloomFilter(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws 
IOException {
+    readFileMetadata(ioSizeRecorder);
     return tsFileMetaData.getBloomFilter();
   }
 
@@ -378,8 +435,15 @@ public class TsFileSequenceReader implements AutoCloseable 
{
    * @throws IOException if an I/O error occurs while reading the file metadata
    */
   public EncryptParameter getEncryptParam() throws IOException {
+    return getEncryptParam(null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public EncryptParameter getEncryptParam(LongConsumer ioSizeRecorder) throws 
IOException {
     if (fileMetadataSize != 0) {
-      readFileMetadata();
+      readFileMetadata(ioSizeRecorder);
       return tsFileMetaData.getEncryptParam();
     }
     return EncryptUtils.encryptParam;
@@ -438,18 +502,28 @@ public class TsFileSequenceReader implements 
AutoCloseable {
 
   public TimeseriesMetadata readTimeseriesMetadata(
       IDeviceID device, String measurement, boolean ignoreNotExists) throws 
IOException {
-    readFileMetadata();
+    return readTimeseriesMetadata(device, measurement, ignoreNotExists, null);
+  }
+
+  public TimeseriesMetadata readTimeseriesMetadata(
+      IDeviceID device,
+      String measurement,
+      boolean ignoreNotExistDevice,
+      LongConsumer ioSizeConsumer)
+      throws IOException {
+    readFileMetadata(ioSizeConsumer);
     MetadataIndexNode deviceMetadataIndexNode =
         tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
-        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true);
+        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true, ioSizeConsumer);
     if (metadataIndexPair == null) {
-      if (ignoreNotExists) {
+      if (ignoreNotExistDevice) {
         return null;
       }
       throw new IOException("Device {" + device + "} is not in 
tsFileMetaData");
     }
-    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+    ByteBuffer buffer =
+        readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, 
ioSizeConsumer);
     MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
     if 
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
 {
       try {
@@ -461,14 +535,16 @@ public class TsFileSequenceReader implements 
AutoCloseable {
         throw e;
       }
       metadataIndexPair =
-          getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
measurement, false);
+          getMetadataAndEndOffsetOfMeasurementNode(
+              metadataIndexNode, measurement, false, ioSizeConsumer);
     }
     if (metadataIndexPair == null) {
       return null;
     }
     List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
     if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < 
Integer.MAX_VALUE) {
-      buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+      buffer =
+          readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right, ioSizeConsumer);
       while (buffer.hasRemaining()) {
         try {
           
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
@@ -479,6 +555,9 @@ public class TsFileSequenceReader implements AutoCloseable {
         }
       }
     } else {
+      if (ioSizeConsumer != null) {
+        ioSizeConsumer.accept(metadataIndexPair.right - 
metadataIndexPair.left.getOffset());
+      }
       // when the buffer length is over than Integer.MAX_VALUE,
       // using tsFileInput to get timeseriesMetadataList
       tsFileInput.position(metadataIndexPair.left.getOffset());
@@ -499,7 +578,7 @@ public class TsFileSequenceReader implements AutoCloseable {
   }
 
   // This method is only used for TsFile
-  public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean 
ignoreNotExists)
+  public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean 
ignoreNotExistDevice)
       throws IOException {
     readFileMetadata();
     MetadataIndexNode deviceMetadataIndexNode =
@@ -507,7 +586,7 @@ public class TsFileSequenceReader implements AutoCloseable {
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
         getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, 
path.getIDeviceID(), true);
     if (metadataIndexPair == null) {
-      if (ignoreNotExists) {
+      if (ignoreNotExistDevice) {
         return null;
       }
       throw new IOException("Device {" + path.getDeviceString() + "} is not in 
tsFileMetaData");
@@ -526,7 +605,8 @@ public class TsFileSequenceReader implements AutoCloseable {
     }
     firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode);
     metadataIndexPair =
-        getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
path.getMeasurement(), false);
+        getMetadataAndEndOffsetOfMeasurementNode(
+            metadataIndexNode, path.getMeasurement(), false, null);
 
     if (metadataIndexPair == null) {
       return null;
@@ -558,18 +638,33 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     }
   }
 
-  /* Find the leaf node that contains path, return all the sensors in that 
leaf node which are also in allSensors set */
+  /**
+   * Find the leaf node that contains path, return all the sensors in that 
leaf node which are also
+   * in allSensors set
+   *
+   * @param ignoreNotExistDevice whether throw IOException if device not found
+   * @param ioSizeRecorder can be null
+   */
   public List<TimeseriesMetadata> readTimeseriesMetadata(
-      IDeviceID device, String measurement, Set<String> allSensors) throws 
IOException {
+      IDeviceID device,
+      String measurement,
+      Set<String> allSensors,
+      boolean ignoreNotExistDevice,
+      LongConsumer ioSizeRecorder)
+      throws IOException {
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
-        getLeafMetadataIndexPair(device, measurement);
+        getLeafMetadataIndexPair(device, measurement, ioSizeRecorder);
     if (metadataIndexPair == null) {
-      return Collections.emptyList();
+      if (ignoreNotExistDevice) {
+        return Collections.emptyList();
+      }
+      throw new IOException("Device {" + device + "} is not in 
tsFileMetaData");
     }
     List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
 
     if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < 
Integer.MAX_VALUE) {
-      ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+      ByteBuffer buffer =
+          readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right, ioSizeRecorder);
       while (buffer.hasRemaining()) {
         TimeseriesMetadata timeseriesMetadata;
         try {
@@ -587,6 +682,9 @@ public class TsFileSequenceReader implements AutoCloseable {
       // when the buffer length is over than Integer.MAX_VALUE,
       // using tsFileInput to get timeseriesMetadataList
       synchronized (this) {
+        if (ioSizeRecorder != null) {
+          ioSizeRecorder.accept(metadataIndexPair.right - 
metadataIndexPair.left.getOffset());
+        }
         tsFileInput.position(metadataIndexPair.left.getOffset());
         while (tsFileInput.position() < metadataIndexPair.right) {
           TimeseriesMetadata timeseriesMetadata;
@@ -610,16 +708,17 @@ public class TsFileSequenceReader implements 
AutoCloseable {
 
   /* Get leaf MetadataIndexPair which contains path */
   private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
-      IDeviceID device, String measurement) throws IOException {
-    readFileMetadata();
+      IDeviceID device, String measurement, LongConsumer ioSizeRecorder) 
throws IOException {
+    readFileMetadata(ioSizeRecorder);
     MetadataIndexNode deviceMetadataIndexNode =
         tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
-        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true);
+        getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
true, ioSizeRecorder);
     if (metadataIndexPair == null) {
       return null;
     }
-    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+    ByteBuffer buffer =
+        readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, 
ioSizeRecorder);
     MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
     if 
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
 {
       try {
@@ -631,7 +730,8 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw e;
       }
       metadataIndexPair =
-          getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, 
measurement, false);
+          getMetadataAndEndOffsetOfMeasurementNode(
+              metadataIndexNode, measurement, false, ioSizeRecorder);
     }
     return metadataIndexPair;
   }
@@ -700,7 +800,7 @@ public class TsFileSequenceReader implements AutoCloseable {
       List<TimeseriesMetadata> timeseriesMetadataList, MetadataIndexNode node, 
String measurement)
       throws IOException {
     Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair =
-        getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false);
+        getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false, 
null);
 
     if (measurementMetadataIndexPair == null) {
       return false;
@@ -1416,6 +1516,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    */
   protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfDeviceNode(
       MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean 
exactSearch) throws IOException {
+    return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID, 
exactSearch, null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfDeviceNode(
+      MetadataIndexNode metadataIndex,
+      IDeviceID deviceID,
+      boolean exactSearch,
+      LongConsumer ioSizeRecorder)
+      throws IOException {
     if (metadataIndex == null) {
       return null;
     }
@@ -1427,12 +1539,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       if 
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) {
         Pair<IMetadataIndexEntry, Long> childIndexEntry =
             metadataIndex.getChildIndexEntry(deviceID, false);
-        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), 
childIndexEntry.right);
+        ByteBuffer buffer =
+            readData(childIndexEntry.left.getOffset(), childIndexEntry.right, 
ioSizeRecorder);
         return getMetadataAndEndOffsetOfDeviceNode(
             
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
                 buffer, deserializeConfig),
             deviceID,
-            exactSearch);
+            exactSearch,
+            ioSizeRecorder);
       } else {
         return metadataIndex.getChildIndexEntry(deviceID, exactSearch);
       }
@@ -1451,10 +1565,15 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @param measurement target measurement
    * @param exactSearch whether is in exact search mode, return null when 
there is no entry with
    *     name; or else return the nearest MetadataIndexEntry before it (for 
deeper search)
+   * @param ioSizeRecorder can be null
    * @return target MetadataIndexEntry, endOffset pair
    */
   protected Pair<IMetadataIndexEntry, Long> 
getMetadataAndEndOffsetOfMeasurementNode(
-      MetadataIndexNode metadataIndex, String measurement, boolean 
exactSearch) throws IOException {
+      MetadataIndexNode metadataIndex,
+      String measurement,
+      boolean exactSearch,
+      LongConsumer ioSizeRecorder)
+      throws IOException {
     if 
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())
         || 
MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) {
       throw new IllegalArgumentException();
@@ -1463,12 +1582,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       if 
(MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType()))
 {
         Pair<IMetadataIndexEntry, Long> childIndexEntry =
             metadataIndex.getChildIndexEntry(measurement, false);
-        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), 
childIndexEntry.right);
+        ByteBuffer buffer =
+            readData(childIndexEntry.left.getOffset(), childIndexEntry.right, 
ioSizeRecorder);
         return getMetadataAndEndOffsetOfMeasurementNode(
             
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
                 buffer, deserializeConfig),
             measurement,
-            exactSearch);
+            exactSearch,
+            ioSizeRecorder);
       } else {
         return metadataIndex.getChildIndexEntry(measurement, exactSearch);
       }
@@ -1543,10 +1664,12 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * read the chunk's header.
    *
    * @param position the file offset of this chunk's header
+   * @param ioSizeRecorder can be null
    */
-  private ChunkHeader readChunkHeader(long position) throws IOException {
+  private ChunkHeader readChunkHeader(long position, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return ChunkHeader.deserializeFrom(tsFileInput, position);
+      return ChunkHeader.deserializeFrom(tsFileInput, position, 
ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1563,8 +1686,21 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return the pages of this chunk
    */
   public ByteBuffer readChunk(long position, int dataSize) throws IOException {
+    return readChunk(position, dataSize, null);
+  }
+
+  /**
+   * notice, this function will modify channel's position.
+   *
+   * @param dataSize the size of chunkdata
+   * @param position the offset of the chunk data
+   * @param ioSizeRecorder can be null
+   * @return the pages of this chunk
+   */
+  public ByteBuffer readChunk(long position, int dataSize, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return readData(position, dataSize);
+      return readData(position, dataSize, ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1579,10 +1715,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return -chunk
    */
   public Chunk readMemChunk(long offset) throws IOException {
+    return readMemChunk(offset, null);
+  }
+
+  /**
+   * @param ioSizeRecorder can be null
+   */
+  public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws 
IOException {
     try {
-      ChunkHeader header = readChunkHeader(offset);
-      ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), 
header.getDataSize());
-      return new Chunk(header, buffer, getEncryptParam());
+      ChunkHeader header = readChunkHeader(offset, ioSizeRecorder);
+      ByteBuffer buffer =
+          readChunk(offset + header.getSerializedSize(), header.getDataSize(), 
ioSizeRecorder);
+      return new Chunk(header, buffer, getEncryptParam(ioSizeRecorder));
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
@@ -1599,7 +1743,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    */
   public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
     try {
-      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader());
+      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), 
null);
       ByteBuffer buffer =
           readChunk(
               metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), 
header.getDataSize());
@@ -1624,7 +1768,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return chunk
    */
   public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) 
throws IOException {
-    ChunkHeader header = 
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader());
+    ChunkHeader header = 
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null);
     ByteBuffer buffer =
         readChunk(
             chunkCacheKey.getOffsetOfChunkHeader() + 
header.getSerializedSize(),
@@ -1664,7 +1808,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
       return null;
     }
     IChunkMetadata lastChunkMetadata = 
chunkMetadataList.get(chunkMetadataList.size() - 1);
-    ChunkHeader header = 
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader());
+    ChunkHeader header = 
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null);
     return new MeasurementSchema(
         lastChunkMetadata.getMeasurementUid(),
         header.getDataType(),
@@ -1792,6 +1936,26 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return data that been read.
    */
   protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+    return readData(position, totalSize, null);
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), 
or the given
+   * position. <br>
+   * if position = -1, the tsFileInput's position will be changed to the 
current position + real
+   * data size that been read. Other wise, the tsFileInput's position is not 
changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the 
current position if
+   *     position = -1
+   * @param totalSize the size of data that want to read
+   * @param ioSizeRecorder can be null
+   * @return data that been read.
+   */
+  protected ByteBuffer readData(long position, int totalSize, LongConsumer 
ioSizeRecorder)
+      throws IOException {
+    if (ioSizeRecorder != null) {
+      ioSizeRecorder.accept(totalSize);
+    }
     int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize);
     int allocateNum = (int) Math.ceil((double) totalSize / allocateSize);
     ByteBuffer buffer = ByteBuffer.allocate(totalSize);
@@ -1833,8 +1997,23 @@ public class TsFileSequenceReader implements 
AutoCloseable {
    * @return data that been read.
    */
   protected ByteBuffer readData(long start, long end) throws IOException {
+    return readData(start, end, null);
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), 
or the given
+   * position.
+   *
+   * @param start the start position of data in the tsFileInput, or the 
current position if position
+   *     = -1
+   * @param end the end position of data that want to read
+   * @param ioSizeRecorder can be null
+   * @return data that been read.
+   */
+  protected ByteBuffer readData(long start, long end, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     try {
-      return readData(start, (int) (end - start));
+      return readData(start, (int) (end - start), ioSizeRecorder);
     } catch (StopReadTsFileByInterruptException e) {
       throw e;
     } catch (Throwable t) {
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
index 817f0f46..53be3fec 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
@@ -25,19 +25,23 @@ import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.file.metadata.TsFileMetadata;
 
 import java.io.IOException;
+import java.util.function.LongConsumer;
 
 /** A class for reading unclosed tsfile. */
 public class UnClosedTsFileReader extends TsFileSequenceReader {
 
   private EncryptParameter encryptParam;
 
-  public UnClosedTsFileReader(String file) throws IOException {
-    super(file, false);
+  // ioSizeRecorder can be null
+  public UnClosedTsFileReader(String file, LongConsumer ioSizeRecorder) throws 
IOException {
+    super(file, false, ioSizeRecorder);
     encryptParam = EncryptUtils.encryptParam;
   }
 
-  public UnClosedTsFileReader(String file, EncryptParameter decryptParam) 
throws IOException {
-    super(file, false);
+  // ioSizeRecorder can be null
+  public UnClosedTsFileReader(
+      String file, EncryptParameter decryptParam, LongConsumer ioSizeRecorder) 
throws IOException {
+    super(file, false, ioSizeRecorder);
     this.encryptParam = encryptParam;
   }
 
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
index bf284515..e4bfbb44 100644
--- 
a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
@@ -73,7 +73,7 @@ public class TimeSeriesMetadataReadTest {
     // s4 should not be returned as result
     set.add("s4");
     List<TimeseriesMetadata> timeseriesMetadataList =
-        reader.readTimeseriesMetadata(path.getIDeviceID(), 
path.getMeasurement(), set);
+        reader.readTimeseriesMetadata(path.getIDeviceID(), 
path.getMeasurement(), set, false, null);
     Assert.assertEquals(3, timeseriesMetadataList.size());
     for (int i = 1; i <= timeseriesMetadataList.size(); i++) {
       Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 
1).getMeasurementId());
@@ -87,7 +87,7 @@ public class TimeSeriesMetadataReadTest {
     // so the result is not supposed to contain this measurement's timeseries 
metadata
     set.add("s8");
     timeseriesMetadataList =
-        reader.readTimeseriesMetadata(path.getIDeviceID(), 
path.getMeasurement(), set);
+        reader.readTimeseriesMetadata(path.getIDeviceID(), 
path.getMeasurement(), set, false, null);
     Assert.assertEquals(2, timeseriesMetadataList.size());
     for (int i = 5; i < 7; i++) {
       Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 
5).getMeasurementId());
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
index 62dc9bda..8e865413 100644
--- 
a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
@@ -57,7 +57,7 @@ public class UnClosedTsFileReaderTest {
     ChunkMetadata chunkMetadata =
         
writer.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0);
 
-    UnClosedTsFileReader reader = new 
UnClosedTsFileReader(file.getAbsolutePath());
+    UnClosedTsFileReader reader = new 
UnClosedTsFileReader(file.getAbsolutePath(), null);
     Chunk chunk = reader.readMemChunk(chunkMetadata);
     ChunkReader chunkReader = new ChunkReader(chunk);
     BatchData batchData = chunkReader.nextPageData();

Reply via email to