This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryIORecord in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 88c9ebd47433d301e16e89eaa90a8b48f77058b2 Author: JackieTien97 <[email protected]> AuthorDate: Tue Nov 12 15:54:20 2024 +0800 Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan --- .../org/apache/tsfile/file/header/ChunkHeader.java | 21 +++ .../apache/tsfile/read/TsFileSequenceReader.java | 173 +++++++++++++++++---- 2 files changed, 166 insertions(+), 28 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java index 6cb3bee5..c07071fb 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.function.LongConsumer; public class ChunkHeader { @@ -189,7 +190,23 @@ public class ChunkHeader { * @throws IOException IOException */ public static ChunkHeader deserializeFrom(TsFileInput input, long offset) throws IOException { + return deserializeFrom(input, offset, null); + } + /** + * deserialize from TsFileInput, the marker has not been read. + * + * @param input TsFileInput + * @param offset offset + * @param ioSizeRecorder can be null + * @return CHUNK_HEADER object + * @throws IOException IOException + */ + public static ChunkHeader deserializeFrom( + TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws IOException { + + // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into the remaining raed + // operation ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1); input.read(buffer, offset); buffer.flip(); @@ -208,6 +225,10 @@ public class ChunkHeader { + TSEncoding.getSerializedSize(); buffer = ByteBuffer.allocate(remainingBytes); + if (ioSizeRecorder != null) { + ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes); + } + input.read(buffer, offset + alreadyReadLength); buffer.flip(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 1374a6d4..7e78187c 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -103,6 +103,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongConsumer; import java.util.stream.Collectors; public class TsFileSequenceReader implements AutoCloseable { @@ -340,13 +341,20 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException io error */ public TsFileMetadata readFileMetadata() throws IOException { + return readFileMetadata(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws IOException { try { if (tsFileMetaData == null) { synchronized (this) { if (tsFileMetaData == null) { tsFileMetaData = deserializeConfig.tsFileMetadataBufferDeserializer.deserialize( - readData(fileMetadataPos, fileMetadataSize), deserializeConfig); + readData(fileMetadataPos, fileMetadataSize, ioSizeRecorder), deserializeConfig); } } } @@ -365,7 +373,14 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException io error */ public BloomFilter readBloomFilter() throws IOException { - readFileMetadata(); + return readBloomFilter(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); return tsFileMetaData.getBloomFilter(); } @@ -378,8 +393,15 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException if an I/O error occurs while reading the file metadata */ public EncryptParameter getEncryptParam() throws IOException { + return getEncryptParam(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public EncryptParameter getEncryptParam(LongConsumer ioSizeRecorder) throws IOException { if (fileMetadataSize != 0) { - readFileMetadata(); + readFileMetadata(ioSizeRecorder); return tsFileMetaData.getEncryptParam(); } return EncryptUtils.encryptParam; @@ -461,7 +483,7 @@ public class TsFileSequenceReader implements AutoCloseable { throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false, null); } if (metadataIndexPair == null) { return null; @@ -526,7 +548,8 @@ public class TsFileSequenceReader implements AutoCloseable { } firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode); metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, path.getMeasurement(), false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, path.getMeasurement(), false, null); if (metadataIndexPair == null) { return null; @@ -561,15 +584,25 @@ public class TsFileSequenceReader implements AutoCloseable { /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */ public List<TimeseriesMetadata> readTimeseriesMetadata( IDeviceID device, String measurement, Set<String> allSensors) throws IOException { + return readTimeseriesMetadata(device, measurement, allSensors, null); + } + + /** + * @param ioSizeRecorder can be null + */ + public List<TimeseriesMetadata> readTimeseriesMetadata( + IDeviceID device, String measurement, Set<String> allSensors, LongConsumer ioSizeRecorder) + throws IOException { Pair<IMetadataIndexEntry, Long> metadataIndexPair = - getLeafMetadataIndexPair(device, measurement); + getLeafMetadataIndexPair(device, measurement, ioSizeRecorder); if (metadataIndexPair == null) { return Collections.emptyList(); } List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < Integer.MAX_VALUE) { - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); while (buffer.hasRemaining()) { TimeseriesMetadata timeseriesMetadata; try { @@ -587,6 +620,9 @@ public class TsFileSequenceReader implements AutoCloseable { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList synchronized (this) { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(metadataIndexPair.right - metadataIndexPair.left.getOffset()); + } tsFileInput.position(metadataIndexPair.left.getOffset()); while (tsFileInput.position() < metadataIndexPair.right) { TimeseriesMetadata timeseriesMetadata; @@ -610,16 +646,17 @@ public class TsFileSequenceReader implements AutoCloseable { /* Get leaf MetadataIndexPair which contains path */ private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair( - IDeviceID device, String measurement) throws IOException { - readFileMetadata(); + IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); Pair<IMetadataIndexEntry, Long> metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true); + getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder); if (metadataIndexPair == null) { return null; } - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { try { @@ -631,7 +668,8 @@ public class TsFileSequenceReader implements AutoCloseable { throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeRecorder); } return metadataIndexPair; } @@ -700,7 +738,7 @@ public class TsFileSequenceReader implements AutoCloseable { List<TimeseriesMetadata> timeseriesMetadataList, MetadataIndexNode node, String measurement) throws IOException { Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false, null); if (measurementMetadataIndexPair == null) { return false; @@ -1400,6 +1438,18 @@ public class TsFileSequenceReader implements AutoCloseable { */ protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfDeviceNode( MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean exactSearch) throws IOException { + return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID, exactSearch, null); + } + + /** + * @param ioSizeRecorder can be null + */ + protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfDeviceNode( + MetadataIndexNode metadataIndex, + IDeviceID deviceID, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { if (metadataIndex == null) { return null; } @@ -1411,12 +1461,14 @@ public class TsFileSequenceReader implements AutoCloseable { if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) { Pair<IMetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(deviceID, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfDeviceNode( deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize( buffer, deserializeConfig), deviceID, - exactSearch); + exactSearch, + ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(deviceID, exactSearch); } @@ -1435,10 +1487,15 @@ public class TsFileSequenceReader implements AutoCloseable { * @param measurement target measurement * @param exactSearch whether is in exact search mode, return null when there is no entry with * name; or else return the nearest MetadataIndexEntry before it (for deeper search) + * @param ioSizeRecorder can be null * @return target MetadataIndexEntry, endOffset pair */ protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfMeasurementNode( - MetadataIndexNode metadataIndex, String measurement, boolean exactSearch) throws IOException { + MetadataIndexNode metadataIndex, + String measurement, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType()) || MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) { throw new IllegalArgumentException(); @@ -1447,12 +1504,14 @@ public class TsFileSequenceReader implements AutoCloseable { if (MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType())) { Pair<IMetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(measurement, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfMeasurementNode( deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( buffer, deserializeConfig), measurement, - exactSearch); + exactSearch, + ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(measurement, exactSearch); } @@ -1527,10 +1586,12 @@ public class TsFileSequenceReader implements AutoCloseable { * read the chunk's header. * * @param position the file offset of this chunk's header + * @param ioSizeRecorder can be null */ - private ChunkHeader readChunkHeader(long position) throws IOException { + private ChunkHeader readChunkHeader(long position, LongConsumer ioSizeRecorder) + throws IOException { try { - return ChunkHeader.deserializeFrom(tsFileInput, position); + return ChunkHeader.deserializeFrom(tsFileInput, position, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1547,8 +1608,21 @@ public class TsFileSequenceReader implements AutoCloseable { * @return the pages of this chunk */ public ByteBuffer readChunk(long position, int dataSize) throws IOException { + return readChunk(position, dataSize, null); + } + + /** + * notice, this function will modify channel's position. + * + * @param dataSize the size of chunkdata + * @param position the offset of the chunk data + * @param ioSizeRecorder can be null + * @return the pages of this chunk + */ + public ByteBuffer readChunk(long position, int dataSize, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(position, dataSize); + return readData(position, dataSize, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1563,10 +1637,18 @@ public class TsFileSequenceReader implements AutoCloseable { * @return -chunk */ public Chunk readMemChunk(long offset) throws IOException { + return readMemChunk(offset, null); + } + + /** + * @param ioSizeRecorder can be null + */ + public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws IOException { try { - ChunkHeader header = readChunkHeader(offset); - ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), header.getDataSize()); - return new Chunk(header, buffer, getEncryptParam()); + ChunkHeader header = readChunkHeader(offset, ioSizeRecorder); + ByteBuffer buffer = + readChunk(offset + header.getSerializedSize(), header.getDataSize(), ioSizeRecorder); + return new Chunk(header, buffer, getEncryptParam(ioSizeRecorder)); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1583,7 +1665,7 @@ public class TsFileSequenceReader implements AutoCloseable { */ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { try { - ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize()); @@ -1608,7 +1690,7 @@ public class TsFileSequenceReader implements AutoCloseable { * @return chunk */ public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException { - ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(), @@ -1648,7 +1730,7 @@ public class TsFileSequenceReader implements AutoCloseable { return null; } IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1); - ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null); return new MeasurementSchema( lastChunkMetadata.getMeasurementUid(), header.getDataType(), @@ -1776,6 +1858,26 @@ public class TsFileSequenceReader implements AutoCloseable { * @return data that been read. */ protected ByteBuffer readData(long position, int totalSize) throws IOException { + return readData(position, totalSize, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position. <br> + * if position = -1, the tsFileInput's position will be changed to the current position + real + * data size that been read. Other wise, the tsFileInput's position is not changed. + * + * @param position the start position of data in the tsFileInput, or the current position if + * position = -1 + * @param totalSize the size of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long position, int totalSize, LongConsumer ioSizeRecorder) + throws IOException { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(totalSize); + } int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize); int allocateNum = (int) Math.ceil((double) totalSize / allocateSize); ByteBuffer buffer = ByteBuffer.allocate(totalSize); @@ -1817,8 +1919,23 @@ public class TsFileSequenceReader implements AutoCloseable { * @return data that been read. */ protected ByteBuffer readData(long start, long end) throws IOException { + return readData(start, end, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position. + * + * @param start the start position of data in the tsFileInput, or the current position if position + * = -1 + * @param end the end position of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long start, long end, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(start, (int) (end - start)); + return readData(start, (int) (end - start), ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) {
