This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryIO-1.1 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 29136c6f914e2d3dc8f71d2dec73a63a22e3f8ee Author: Jackie Tien <[email protected]> AuthorDate: Wed Nov 13 09:28:12 2024 +0800 Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan --- .../org/apache/tsfile/file/header/ChunkHeader.java | 21 ++ .../apache/tsfile/read/TsFileSequenceReader.java | 254 +++++++++++++++++---- .../apache/tsfile/read/UnClosedTsFileReader.java | 6 +- .../tsfile/read/TimeSeriesMetadataReadTest.java | 4 +- 4 files changed, 240 insertions(+), 45 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java index 6cb3bee5..c07071fb 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.function.LongConsumer; public class ChunkHeader { @@ -189,7 +190,23 @@ public class ChunkHeader { * @throws IOException IOException */ public static ChunkHeader deserializeFrom(TsFileInput input, long offset) throws IOException { + return deserializeFrom(input, offset, null); + } + /** + * deserialize from TsFileInput, the marker has not been read. + * + * @param input TsFileInput + * @param offset offset + * @param ioSizeRecorder can be null + * @return CHUNK_HEADER object + * @throws IOException IOException + */ + public static ChunkHeader deserializeFrom( + TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws IOException { + + // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into the remaining raed + // operation ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1); input.read(buffer, offset); buffer.flip(); @@ -208,6 +225,10 @@ public class ChunkHeader { + TSEncoding.getSerializedSize(); buffer = ByteBuffer.allocate(remainingBytes); + if (ioSizeRecorder != null) { + ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes); + } + input.read(buffer, offset + alreadyReadLength); buffer.flip(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index fa9b8ab3..e0b4414a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -93,6 +93,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongConsumer; import java.util.stream.Collectors; public class TsFileSequenceReader implements AutoCloseable { @@ -130,7 +131,21 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException If some I/O error occurs */ public TsFileSequenceReader(String file) throws IOException { - this(file, true); + this(file, null); + } + + /** + * Create a file reader of the given file. The reader will read the tail of the file to get the + * file metadata size.Then the reader will skip the first + * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length + * bytes of the file for preparing reading real data. + * + * @param file the data file + * @param ioSizeRecorder can be null + * @throws IOException If some I/O error occurs + */ + public TsFileSequenceReader(String file, LongConsumer ioSizeRecorder) throws IOException { + this(file, true, ioSizeRecorder); } /** @@ -140,6 +155,18 @@ public class TsFileSequenceReader implements AutoCloseable { * @param loadMetadataSize -whether load meta data size */ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { + this(file, loadMetadataSize, null); + } + + /** + * construct function for TsFileSequenceReader. + * + * @param file -given file name + * @param loadMetadataSize -whether load meta data size + * @param ioSizeRecorder can be null + */ + public TsFileSequenceReader(String file, boolean loadMetadataSize, LongConsumer ioSizeRecorder) + throws IOException { if (resourceLogger.isDebugEnabled()) { resourceLogger.debug("{} reader is opened. {}", file, getClass().getName()); } @@ -147,7 +174,7 @@ public class TsFileSequenceReader implements AutoCloseable { tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file); try { if (loadMetadataSize) { - loadMetadataSize(); + loadMetadataSize(ioSizeRecorder); } } catch (Throwable e) { tsFileInput.close(); @@ -209,8 +236,19 @@ public class TsFileSequenceReader implements AutoCloseable { } public void loadMetadataSize() throws IOException { - ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES); + loadMetadataSize(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public void loadMetadataSize(LongConsumer ioSizeRecorder) throws IOException { + int readSize = Integer.BYTES; + ByteBuffer metadataSize = ByteBuffer.allocate(readSize); if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(readSize); + } tsFileInput.read( metadataSize, tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES); @@ -283,7 +321,8 @@ public class TsFileSequenceReader implements AutoCloseable { /** this function reads version number and checks compatibility of TsFile. */ public byte readVersionNumber() throws IOException { ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES); - tsFileInput.read(versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes().length); + tsFileInput.read( + versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length); versionNumberByte.flip(); return versionNumberByte.get(); } @@ -294,12 +333,20 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException io error */ public TsFileMetadata readFileMetadata() throws IOException { + return readFileMetadata(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws IOException { try { if (tsFileMetaData == null) { synchronized (this) { if (tsFileMetaData == null) { tsFileMetaData = - TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize)); + TsFileMetadata.deserializeFrom( + readData(fileMetadataPos, fileMetadataSize, ioSizeRecorder)); } } } @@ -318,7 +365,14 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException io error */ public BloomFilter readBloomFilter() throws IOException { - readFileMetadata(); + return readBloomFilter(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); return tsFileMetaData.getBloomFilter(); } @@ -375,17 +429,27 @@ public class TsFileSequenceReader implements AutoCloseable { public TimeseriesMetadata readTimeseriesMetadata( IDeviceID device, String measurement, boolean ignoreNotExists) throws IOException { - readFileMetadata(); + return readTimeseriesMetadata(device, measurement, ignoreNotExists, null); + } + + public TimeseriesMetadata readTimeseriesMetadata( + IDeviceID device, + String measurement, + boolean ignoreNotExistDevice, + LongConsumer ioSizeConsumer) + throws IOException { + readFileMetadata(ioSizeConsumer); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); Pair<IMetadataIndexEntry, Long> metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true); + getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeConsumer); if (metadataIndexPair == null) { - if (ignoreNotExists) { + if (ignoreNotExistDevice) { return null; } throw new IOException("Device {" + device + "} is not in tsFileMetaData"); } - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { try { @@ -395,14 +459,16 @@ public class TsFileSequenceReader implements AutoCloseable { throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeConsumer); } if (metadataIndexPair == null) { return null; } List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < Integer.MAX_VALUE) { - buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); while (buffer.hasRemaining()) { try { timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); @@ -413,6 +479,9 @@ public class TsFileSequenceReader implements AutoCloseable { } } } else { + if (ioSizeConsumer != null) { + ioSizeConsumer.accept(metadataIndexPair.right - metadataIndexPair.left.getOffset()); + } // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList tsFileInput.position(metadataIndexPair.left.getOffset()); @@ -433,14 +502,14 @@ public class TsFileSequenceReader implements AutoCloseable { } // This method is only used for TsFile - public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExists) + public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExistDevice) throws IOException { readFileMetadata(); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); Pair<IMetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, path.getIDeviceID(), true); if (metadataIndexPair == null) { - if (ignoreNotExists) { + if (ignoreNotExistDevice) { return null; } throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); @@ -457,7 +526,8 @@ public class TsFileSequenceReader implements AutoCloseable { } firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode); metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, path.getMeasurement(), false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, path.getMeasurement(), false, null); if (metadataIndexPair == null) { return null; @@ -489,18 +559,33 @@ public class TsFileSequenceReader implements AutoCloseable { } } - /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */ + /** + * Find the leaf node that contains path, return all the sensors in that leaf node which are also + * in allSensors set + * + * @param ignoreNotExistDevice whether throw IOException if device not found + * @param ioSizeRecorder can be null + */ public List<TimeseriesMetadata> readTimeseriesMetadata( - IDeviceID device, String measurement, Set<String> allSensors) throws IOException { + IDeviceID device, + String measurement, + Set<String> allSensors, + boolean ignoreNotExistDevice, + LongConsumer ioSizeRecorder) + throws IOException { Pair<IMetadataIndexEntry, Long> metadataIndexPair = - getLeafMetadataIndexPair(device, measurement); + getLeafMetadataIndexPair(device, measurement, ioSizeRecorder); if (metadataIndexPair == null) { - return Collections.emptyList(); + if (ignoreNotExistDevice) { + return Collections.emptyList(); + } + throw new IOException("Device {" + device + "} is not in tsFileMetaData"); } List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < Integer.MAX_VALUE) { - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); while (buffer.hasRemaining()) { TimeseriesMetadata timeseriesMetadata; try { @@ -518,6 +603,9 @@ public class TsFileSequenceReader implements AutoCloseable { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList synchronized (this) { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(metadataIndexPair.right - metadataIndexPair.left.getOffset()); + } tsFileInput.position(metadataIndexPair.left.getOffset()); while (tsFileInput.position() < metadataIndexPair.right) { TimeseriesMetadata timeseriesMetadata; @@ -541,15 +629,16 @@ public class TsFileSequenceReader implements AutoCloseable { /* Get leaf MetadataIndexPair which contains path */ private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair( - IDeviceID device, String measurement) throws IOException { - readFileMetadata(); + IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); Pair<IMetadataIndexEntry, Long> metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true); + getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder); if (metadataIndexPair == null) { return null; } - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { try { @@ -559,12 +648,12 @@ public class TsFileSequenceReader implements AutoCloseable { throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeRecorder); } return metadataIndexPair; } - // This method is only used for TsFile public List<ITimeSeriesMetadata> readITimeseriesMetadata( IDeviceID device, Set<String> measurements) throws IOException { readFileMetadata(); @@ -601,7 +690,7 @@ public class TsFileSequenceReader implements AutoCloseable { timeseriesMetadataList.clear(); measurementMetadataIndexPair = getMetadataAndEndOffsetOfMeasurementNode( - measurementMetadataIndexNode, measurementList.get(i), false); + measurementMetadataIndexNode, measurementList.get(i), false, null); if (measurementMetadataIndexPair == null) { continue; @@ -1321,6 +1410,21 @@ public class TsFileSequenceReader implements AutoCloseable { */ protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfDeviceNode( MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean exactSearch) throws IOException { + return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID, exactSearch, null); + } + + /** + * @param ioSizeRecorder can be null + */ + protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfDeviceNode( + MetadataIndexNode metadataIndex, + IDeviceID deviceID, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { + if (metadataIndex == null) { + return null; + } if (MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType()) || MetadataIndexNodeType.LEAF_MEASUREMENT.equals(metadataIndex.getNodeType())) { throw new IllegalArgumentException(); @@ -1329,9 +1433,10 @@ public class TsFileSequenceReader implements AutoCloseable { if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) { Pair<IMetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(deviceID, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfDeviceNode( - MetadataIndexNode.deserializeFrom(buffer, true), deviceID, exactSearch); + MetadataIndexNode.deserializeFrom(buffer, true), deviceID, exactSearch, ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(deviceID, exactSearch); } @@ -1350,10 +1455,15 @@ public class TsFileSequenceReader implements AutoCloseable { * @param measurement target measurement * @param exactSearch whether is in exact search mode, return null when there is no entry with * name; or else return the nearest MetadataIndexEntry before it (for deeper search) + * @param ioSizeRecorder can be null * @return target MetadataIndexEntry, endOffset pair */ protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfMeasurementNode( - MetadataIndexNode metadataIndex, String measurement, boolean exactSearch) throws IOException { + MetadataIndexNode metadataIndex, + String measurement, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType()) || MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) { throw new IllegalArgumentException(); @@ -1362,9 +1472,13 @@ public class TsFileSequenceReader implements AutoCloseable { if (MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType())) { Pair<IMetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(measurement, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfMeasurementNode( - MetadataIndexNode.deserializeFrom(buffer, false), measurement, exactSearch); + MetadataIndexNode.deserializeFrom(buffer, false), + measurement, + exactSearch, + ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(measurement, exactSearch); } @@ -1437,10 +1551,12 @@ public class TsFileSequenceReader implements AutoCloseable { * read the chunk's header. * * @param position the file offset of this chunk's header + * @param ioSizeRecorder can be null */ - private ChunkHeader readChunkHeader(long position) throws IOException { + private ChunkHeader readChunkHeader(long position, LongConsumer ioSizeRecorder) + throws IOException { try { - return ChunkHeader.deserializeFrom(tsFileInput, position); + return ChunkHeader.deserializeFrom(tsFileInput, position, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1457,8 +1573,21 @@ public class TsFileSequenceReader implements AutoCloseable { * @return the pages of this chunk */ public ByteBuffer readChunk(long position, int dataSize) throws IOException { + return readChunk(position, dataSize, null); + } + + /** + * notice, this function will modify channel's position. + * + * @param dataSize the size of chunkdata + * @param position the offset of the chunk data + * @param ioSizeRecorder can be null + * @return the pages of this chunk + */ + public ByteBuffer readChunk(long position, int dataSize, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(position, dataSize); + return readData(position, dataSize, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1473,9 +1602,17 @@ public class TsFileSequenceReader implements AutoCloseable { * @return -chunk */ public Chunk readMemChunk(long offset) throws IOException { + return readMemChunk(offset, null); + } + + /** + * @param ioSizeRecorder can be null + */ + public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws IOException { try { - ChunkHeader header = readChunkHeader(offset); - ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), header.getDataSize()); + ChunkHeader header = readChunkHeader(offset, ioSizeRecorder); + ByteBuffer buffer = + readChunk(offset + header.getSerializedSize(), header.getDataSize(), ioSizeRecorder); return new Chunk(header, buffer); } catch (StopReadTsFileByInterruptException e) { throw e; @@ -1493,7 +1630,7 @@ public class TsFileSequenceReader implements AutoCloseable { */ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { try { - ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize()); @@ -1513,7 +1650,7 @@ public class TsFileSequenceReader implements AutoCloseable { * @return chunk */ public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException { - ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(), @@ -1550,7 +1687,7 @@ public class TsFileSequenceReader implements AutoCloseable { return null; } IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1); - ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null); return new MeasurementSchema( lastChunkMetadata.getMeasurementUid(), header.getDataType(), @@ -1643,6 +1780,26 @@ public class TsFileSequenceReader implements AutoCloseable { * @return data that been read. */ protected ByteBuffer readData(long position, int totalSize) throws IOException { + return readData(position, totalSize, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position. <br> + * if position = -1, the tsFileInput's position will be changed to the current position + real + * data size that been read. Other wise, the tsFileInput's position is not changed. + * + * @param position the start position of data in the tsFileInput, or the current position if + * position = -1 + * @param totalSize the size of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long position, int totalSize, LongConsumer ioSizeRecorder) + throws IOException { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(totalSize); + } int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize); int allocateNum = (int) Math.ceil((double) totalSize / allocateSize); ByteBuffer buffer = ByteBuffer.allocate(totalSize); @@ -1684,8 +1841,23 @@ public class TsFileSequenceReader implements AutoCloseable { * @return data that been read. */ protected ByteBuffer readData(long start, long end) throws IOException { + return readData(start, end, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position. + * + * @param start the start position of data in the tsFileInput, or the current position if position + * = -1 + * @param end the end position of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long start, long end, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(start, (int) (end - start)); + return readData(start, (int) (end - start), ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java index 4c19dac9..7c9e069d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java @@ -23,12 +23,14 @@ import org.apache.tsfile.exception.NotImplementedException; import org.apache.tsfile.file.metadata.TsFileMetadata; import java.io.IOException; +import java.util.function.LongConsumer; /** A class for reading unclosed tsfile. */ public class UnClosedTsFileReader extends TsFileSequenceReader { - public UnClosedTsFileReader(String file) throws IOException { - super(file, false); + // ioSizeRecorder can be null + public UnClosedTsFileReader(String file, LongConsumer ioSizeRecorder) throws IOException { + super(file, false, ioSizeRecorder); } /** unclosed file has no tail magic data. */ diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java index bf284515..e4bfbb44 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java @@ -73,7 +73,7 @@ public class TimeSeriesMetadataReadTest { // s4 should not be returned as result set.add("s4"); List<TimeseriesMetadata> timeseriesMetadataList = - reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set); + reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set, false, null); Assert.assertEquals(3, timeseriesMetadataList.size()); for (int i = 1; i <= timeseriesMetadataList.size(); i++) { Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 1).getMeasurementId()); @@ -87,7 +87,7 @@ public class TimeSeriesMetadataReadTest { // so the result is not supposed to contain this measurement's timeseries metadata set.add("s8"); timeseriesMetadataList = - reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set); + reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set, false, null); Assert.assertEquals(2, timeseriesMetadataList.size()); for (int i = 5; i < 7; i++) { Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 5).getMeasurementId());
