This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch iotdb
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/iotdb by this push:
new af5cc2e0 Add LongConsumer ioSizeRecorder in TsFileSequenceReader for
IoTDB scan
af5cc2e0 is described below
commit af5cc2e0e4e8a6b81cec172f4155890e201cc8a8
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Nov 13 09:28:12 2024 +0800
Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan
---
.../org/apache/tsfile/file/header/ChunkHeader.java | 21 ++
.../apache/tsfile/read/TsFileSequenceReader.java | 269 +++++++++++++++++----
.../apache/tsfile/read/UnClosedTsFileReader.java | 12 +-
.../tsfile/read/TimeSeriesMetadataReadTest.java | 4 +-
.../tsfile/read/UnClosedTsFileReaderTest.java | 2 +-
5 files changed, 256 insertions(+), 52 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
index 6cb3bee5..c07071fb 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.LongConsumer;
public class ChunkHeader {
@@ -189,7 +190,23 @@ public class ChunkHeader {
* @throws IOException IOException
*/
public static ChunkHeader deserializeFrom(TsFileInput input, long offset)
throws IOException {
+ return deserializeFrom(input, offset, null);
+ }
+ /**
+ * deserialize from TsFileInput, the marker has not been read.
+ *
+ * @param input TsFileInput
+ * @param offset offset
+ * @param ioSizeRecorder can be null
+ * @return CHUNK_HEADER object
+ * @throws IOException IOException
+ */
+ public static ChunkHeader deserializeFrom(
+ TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws
IOException {
+
+ // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into
the remaining raed
+ // operation
ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1);
input.read(buffer, offset);
buffer.flip();
@@ -208,6 +225,10 @@ public class ChunkHeader {
+ TSEncoding.getSerializedSize();
buffer = ByteBuffer.allocate(remainingBytes);
+ if (ioSizeRecorder != null) {
+ ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes);
+ }
+
input.read(buffer, offset + alreadyReadLength);
buffer.flip();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index d55daf26..06ec5466 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -103,6 +103,7 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongConsumer;
import java.util.stream.Collectors;
public class TsFileSequenceReader implements AutoCloseable {
@@ -144,7 +145,21 @@ public class TsFileSequenceReader implements AutoCloseable
{
* @throws IOException If some I/O error occurs
*/
public TsFileSequenceReader(String file) throws IOException {
- this(file, true);
+ this(file, null);
+ }
+
+ /**
+ * Create a file reader of the given file. The reader will read the tail of
the file to get the
+ * file metadata size.Then the reader will skip the first
+ * TSFileConfig.MAGIC_STRING.getBytes().length +
TSFileConfig.NUMBER_VERSION.getBytes().length
+ * bytes of the file for preparing reading real data.
+ *
+ * @param file the data file
+ * @param ioSizeRecorder can be null
+ * @throws IOException If some I/O error occurs
+ */
+ public TsFileSequenceReader(String file, LongConsumer ioSizeRecorder) throws
IOException {
+ this(file, true, ioSizeRecorder);
}
/**
@@ -154,6 +169,18 @@ public class TsFileSequenceReader implements AutoCloseable
{
* @param loadMetadataSize -whether load meta data size
*/
public TsFileSequenceReader(String file, boolean loadMetadataSize) throws
IOException {
+ this(file, loadMetadataSize, null);
+ }
+
+ /**
+ * construct function for TsFileSequenceReader.
+ *
+ * @param file -given file name
+ * @param loadMetadataSize -whether load meta data size
+ * @param ioSizeRecorder can be null
+ */
+ public TsFileSequenceReader(String file, boolean loadMetadataSize,
LongConsumer ioSizeRecorder)
+ throws IOException {
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} reader is opened. {}", file,
getClass().getName());
}
@@ -161,9 +188,9 @@ public class TsFileSequenceReader implements AutoCloseable {
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
try {
- loadFileVersion();
+ loadFileVersion(ioSizeRecorder);
if (loadMetadataSize) {
- loadMetadataSize();
+ loadMetadataSize(ioSizeRecorder);
}
} catch (Throwable e) {
tsFileInput.close();
@@ -224,10 +251,14 @@ public class TsFileSequenceReader implements
AutoCloseable {
this.fileMetadataSize = fileMetadataSize;
}
- private void loadFileVersion() throws IOException {
+ // ioSizeRecorder can be null
+ private void loadFileVersion(LongConsumer ioSizeRecorder) throws IOException
{
try {
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
- final ByteBuffer buffer = ByteBuffer.allocate(1);
+ final ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES);
+ if (ioSizeRecorder != null) {
+ ioSizeRecorder.accept(Byte.BYTES);
+ }
tsFileInput.read(buffer);
buffer.flip();
fileVersion = buffer.get();
@@ -255,8 +286,19 @@ public class TsFileSequenceReader implements AutoCloseable
{
}
public void loadMetadataSize() throws IOException {
- ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+ loadMetadataSize(null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ public void loadMetadataSize(LongConsumer ioSizeRecorder) throws IOException
{
+ int readSize = Integer.BYTES;
+ ByteBuffer metadataSize = ByteBuffer.allocate(readSize);
if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+ if (ioSizeRecorder != null) {
+ ioSizeRecorder.accept(readSize);
+ }
tsFileInput.read(
metadataSize,
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length -
Integer.BYTES);
@@ -329,7 +371,8 @@ public class TsFileSequenceReader implements AutoCloseable {
/** this function reads version number and checks compatibility of TsFile. */
public byte readVersionNumber() throws IOException {
ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES);
- tsFileInput.read(versionNumberByte,
TSFileConfig.MAGIC_STRING.getBytes().length);
+ tsFileInput.read(
+ versionNumberByte,
TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
versionNumberByte.flip();
return versionNumberByte.get();
}
@@ -340,13 +383,20 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @throws IOException io error
*/
public TsFileMetadata readFileMetadata() throws IOException {
+ return readFileMetadata(null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws
IOException {
try {
if (tsFileMetaData == null) {
synchronized (this) {
if (tsFileMetaData == null) {
tsFileMetaData =
deserializeConfig.tsFileMetadataBufferDeserializer.deserialize(
- readData(fileMetadataPos, fileMetadataSize),
deserializeConfig);
+ readData(fileMetadataPos, fileMetadataSize,
ioSizeRecorder), deserializeConfig);
}
}
}
@@ -365,7 +415,14 @@ public class TsFileSequenceReader implements AutoCloseable
{
* @throws IOException io error
*/
public BloomFilter readBloomFilter() throws IOException {
- readFileMetadata();
+ return readBloomFilter(null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws
IOException {
+ readFileMetadata(ioSizeRecorder);
return tsFileMetaData.getBloomFilter();
}
@@ -378,8 +435,15 @@ public class TsFileSequenceReader implements AutoCloseable
{
* @throws IOException if an I/O error occurs while reading the file metadata
*/
public EncryptParameter getEncryptParam() throws IOException {
+ return getEncryptParam(null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ public EncryptParameter getEncryptParam(LongConsumer ioSizeRecorder) throws
IOException {
if (fileMetadataSize != 0) {
- readFileMetadata();
+ readFileMetadata(ioSizeRecorder);
return tsFileMetaData.getEncryptParam();
}
return EncryptUtils.encryptParam;
@@ -438,18 +502,28 @@ public class TsFileSequenceReader implements
AutoCloseable {
public TimeseriesMetadata readTimeseriesMetadata(
IDeviceID device, String measurement, boolean ignoreNotExists) throws
IOException {
- readFileMetadata();
+ return readTimeseriesMetadata(device, measurement, ignoreNotExists, null);
+ }
+
+ public TimeseriesMetadata readTimeseriesMetadata(
+ IDeviceID device,
+ String measurement,
+ boolean ignoreNotExistDevice,
+ LongConsumer ioSizeConsumer)
+ throws IOException {
+ readFileMetadata(ioSizeConsumer);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
true);
+ getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
true, ioSizeConsumer);
if (metadataIndexPair == null) {
- if (ignoreNotExists) {
+ if (ignoreNotExistDevice) {
return null;
}
throw new IOException("Device {" + device + "} is not in
tsFileMetaData");
}
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ ByteBuffer buffer =
+ readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right,
ioSizeConsumer);
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
if
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
try {
@@ -461,14 +535,16 @@ public class TsFileSequenceReader implements
AutoCloseable {
throw e;
}
metadataIndexPair =
- getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode,
measurement, false);
+ getMetadataAndEndOffsetOfMeasurementNode(
+ metadataIndexNode, measurement, false, ioSizeConsumer);
}
if (metadataIndexPair == null) {
return null;
}
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
if (metadataIndexPair.right - metadataIndexPair.left.getOffset() <
Integer.MAX_VALUE) {
- buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ buffer =
+ readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right, ioSizeConsumer);
while (buffer.hasRemaining()) {
try {
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
@@ -479,6 +555,9 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
} else {
+ if (ioSizeConsumer != null) {
+ ioSizeConsumer.accept(metadataIndexPair.right -
metadataIndexPair.left.getOffset());
+ }
// when the buffer length is over than Integer.MAX_VALUE,
// using tsFileInput to get timeseriesMetadataList
tsFileInput.position(metadataIndexPair.left.getOffset());
@@ -499,7 +578,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
// This method is only used for TsFile
- public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean
ignoreNotExists)
+ public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean
ignoreNotExistDevice)
throws IOException {
readFileMetadata();
MetadataIndexNode deviceMetadataIndexNode =
@@ -507,7 +586,7 @@ public class TsFileSequenceReader implements AutoCloseable {
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode,
path.getIDeviceID(), true);
if (metadataIndexPair == null) {
- if (ignoreNotExists) {
+ if (ignoreNotExistDevice) {
return null;
}
throw new IOException("Device {" + path.getDeviceString() + "} is not in
tsFileMetaData");
@@ -526,7 +605,8 @@ public class TsFileSequenceReader implements AutoCloseable {
}
firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode);
metadataIndexPair =
- getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode,
path.getMeasurement(), false);
+ getMetadataAndEndOffsetOfMeasurementNode(
+ metadataIndexNode, path.getMeasurement(), false, null);
if (metadataIndexPair == null) {
return null;
@@ -558,18 +638,33 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
}
- /* Find the leaf node that contains path, return all the sensors in that
leaf node which are also in allSensors set */
+ /**
+ * Find the leaf node that contains path, return all the sensors in that
leaf node which are also
+ * in allSensors set
+ *
+ * @param ignoreNotExistDevice whether throw IOException if device not found
+ * @param ioSizeRecorder can be null
+ */
public List<TimeseriesMetadata> readTimeseriesMetadata(
- IDeviceID device, String measurement, Set<String> allSensors) throws
IOException {
+ IDeviceID device,
+ String measurement,
+ Set<String> allSensors,
+ boolean ignoreNotExistDevice,
+ LongConsumer ioSizeRecorder)
+ throws IOException {
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
- getLeafMetadataIndexPair(device, measurement);
+ getLeafMetadataIndexPair(device, measurement, ioSizeRecorder);
if (metadataIndexPair == null) {
- return Collections.emptyList();
+ if (ignoreNotExistDevice) {
+ return Collections.emptyList();
+ }
+ throw new IOException("Device {" + device + "} is not in
tsFileMetaData");
}
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
if (metadataIndexPair.right - metadataIndexPair.left.getOffset() <
Integer.MAX_VALUE) {
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ ByteBuffer buffer =
+ readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right, ioSizeRecorder);
while (buffer.hasRemaining()) {
TimeseriesMetadata timeseriesMetadata;
try {
@@ -587,6 +682,9 @@ public class TsFileSequenceReader implements AutoCloseable {
// when the buffer length is over than Integer.MAX_VALUE,
// using tsFileInput to get timeseriesMetadataList
synchronized (this) {
+ if (ioSizeRecorder != null) {
+ ioSizeRecorder.accept(metadataIndexPair.right -
metadataIndexPair.left.getOffset());
+ }
tsFileInput.position(metadataIndexPair.left.getOffset());
while (tsFileInput.position() < metadataIndexPair.right) {
TimeseriesMetadata timeseriesMetadata;
@@ -610,16 +708,17 @@ public class TsFileSequenceReader implements
AutoCloseable {
/* Get leaf MetadataIndexPair which contains path */
private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
- IDeviceID device, String measurement) throws IOException {
- readFileMetadata();
+ IDeviceID device, String measurement, LongConsumer ioSizeRecorder)
throws IOException {
+ readFileMetadata(ioSizeRecorder);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
true);
+ getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
true, ioSizeRecorder);
if (metadataIndexPair == null) {
return null;
}
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ ByteBuffer buffer =
+ readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right,
ioSizeRecorder);
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
if
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
try {
@@ -631,7 +730,8 @@ public class TsFileSequenceReader implements AutoCloseable {
throw e;
}
metadataIndexPair =
- getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode,
measurement, false);
+ getMetadataAndEndOffsetOfMeasurementNode(
+ metadataIndexNode, measurement, false, ioSizeRecorder);
}
return metadataIndexPair;
}
@@ -700,7 +800,7 @@ public class TsFileSequenceReader implements AutoCloseable {
List<TimeseriesMetadata> timeseriesMetadataList, MetadataIndexNode node,
String measurement)
throws IOException {
Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair =
- getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false);
+ getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false,
null);
if (measurementMetadataIndexPair == null) {
return false;
@@ -1416,6 +1516,18 @@ public class TsFileSequenceReader implements
AutoCloseable {
*/
protected Pair<IMetadataIndexEntry, Long>
getMetadataAndEndOffsetOfDeviceNode(
MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean
exactSearch) throws IOException {
+ return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID,
exactSearch, null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ protected Pair<IMetadataIndexEntry, Long>
getMetadataAndEndOffsetOfDeviceNode(
+ MetadataIndexNode metadataIndex,
+ IDeviceID deviceID,
+ boolean exactSearch,
+ LongConsumer ioSizeRecorder)
+ throws IOException {
if (metadataIndex == null) {
return null;
}
@@ -1427,12 +1539,14 @@ public class TsFileSequenceReader implements
AutoCloseable {
if
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) {
Pair<IMetadataIndexEntry, Long> childIndexEntry =
metadataIndex.getChildIndexEntry(deviceID, false);
- ByteBuffer buffer = readData(childIndexEntry.left.getOffset(),
childIndexEntry.right);
+ ByteBuffer buffer =
+ readData(childIndexEntry.left.getOffset(), childIndexEntry.right,
ioSizeRecorder);
return getMetadataAndEndOffsetOfDeviceNode(
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
buffer, deserializeConfig),
deviceID,
- exactSearch);
+ exactSearch,
+ ioSizeRecorder);
} else {
return metadataIndex.getChildIndexEntry(deviceID, exactSearch);
}
@@ -1451,10 +1565,15 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @param measurement target measurement
* @param exactSearch whether is in exact search mode, return null when
there is no entry with
* name; or else return the nearest MetadataIndexEntry before it (for
deeper search)
+ * @param ioSizeRecorder can be null
* @return target MetadataIndexEntry, endOffset pair
*/
protected Pair<IMetadataIndexEntry, Long>
getMetadataAndEndOffsetOfMeasurementNode(
- MetadataIndexNode metadataIndex, String measurement, boolean
exactSearch) throws IOException {
+ MetadataIndexNode metadataIndex,
+ String measurement,
+ boolean exactSearch,
+ LongConsumer ioSizeRecorder)
+ throws IOException {
if
(MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())
||
MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) {
throw new IllegalArgumentException();
@@ -1463,12 +1582,14 @@ public class TsFileSequenceReader implements
AutoCloseable {
if
(MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType()))
{
Pair<IMetadataIndexEntry, Long> childIndexEntry =
metadataIndex.getChildIndexEntry(measurement, false);
- ByteBuffer buffer = readData(childIndexEntry.left.getOffset(),
childIndexEntry.right);
+ ByteBuffer buffer =
+ readData(childIndexEntry.left.getOffset(), childIndexEntry.right,
ioSizeRecorder);
return getMetadataAndEndOffsetOfMeasurementNode(
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
buffer, deserializeConfig),
measurement,
- exactSearch);
+ exactSearch,
+ ioSizeRecorder);
} else {
return metadataIndex.getChildIndexEntry(measurement, exactSearch);
}
@@ -1543,10 +1664,12 @@ public class TsFileSequenceReader implements
AutoCloseable {
* read the chunk's header.
*
* @param position the file offset of this chunk's header
+ * @param ioSizeRecorder can be null
*/
- private ChunkHeader readChunkHeader(long position) throws IOException {
+ private ChunkHeader readChunkHeader(long position, LongConsumer
ioSizeRecorder)
+ throws IOException {
try {
- return ChunkHeader.deserializeFrom(tsFileInput, position);
+ return ChunkHeader.deserializeFrom(tsFileInput, position,
ioSizeRecorder);
} catch (StopReadTsFileByInterruptException e) {
throw e;
} catch (Throwable t) {
@@ -1563,8 +1686,21 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @return the pages of this chunk
*/
public ByteBuffer readChunk(long position, int dataSize) throws IOException {
+ return readChunk(position, dataSize, null);
+ }
+
+ /**
+ * notice, this function will modify channel's position.
+ *
+ * @param dataSize the size of chunkdata
+ * @param position the offset of the chunk data
+ * @param ioSizeRecorder can be null
+ * @return the pages of this chunk
+ */
+ public ByteBuffer readChunk(long position, int dataSize, LongConsumer
ioSizeRecorder)
+ throws IOException {
try {
- return readData(position, dataSize);
+ return readData(position, dataSize, ioSizeRecorder);
} catch (StopReadTsFileByInterruptException e) {
throw e;
} catch (Throwable t) {
@@ -1579,10 +1715,18 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @return -chunk
*/
public Chunk readMemChunk(long offset) throws IOException {
+ return readMemChunk(offset, null);
+ }
+
+ /**
+ * @param ioSizeRecorder can be null
+ */
+ public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws
IOException {
try {
- ChunkHeader header = readChunkHeader(offset);
- ByteBuffer buffer = readChunk(offset + header.getSerializedSize(),
header.getDataSize());
- return new Chunk(header, buffer, getEncryptParam());
+ ChunkHeader header = readChunkHeader(offset, ioSizeRecorder);
+ ByteBuffer buffer =
+ readChunk(offset + header.getSerializedSize(), header.getDataSize(),
ioSizeRecorder);
+ return new Chunk(header, buffer, getEncryptParam(ioSizeRecorder));
} catch (StopReadTsFileByInterruptException e) {
throw e;
} catch (Throwable t) {
@@ -1599,7 +1743,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
*/
public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
try {
- ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader());
+ ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(),
null);
ByteBuffer buffer =
readChunk(
metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
header.getDataSize());
@@ -1624,7 +1768,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @return chunk
*/
public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey)
throws IOException {
- ChunkHeader header =
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader());
+ ChunkHeader header =
readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null);
ByteBuffer buffer =
readChunk(
chunkCacheKey.getOffsetOfChunkHeader() +
header.getSerializedSize(),
@@ -1664,7 +1808,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
return null;
}
IChunkMetadata lastChunkMetadata =
chunkMetadataList.get(chunkMetadataList.size() - 1);
- ChunkHeader header =
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader());
+ ChunkHeader header =
readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null);
return new MeasurementSchema(
lastChunkMetadata.getMeasurementUid(),
header.getDataType(),
@@ -1792,6 +1936,26 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @return data that been read.
*/
protected ByteBuffer readData(long position, int totalSize) throws
IOException {
+ return readData(position, totalSize, null);
+ }
+
+ /**
+ * read data from tsFileInput, from the current position (if position = -1),
or the given
+ * position. <br>
+ * if position = -1, the tsFileInput's position will be changed to the
current position + real
+ * data size that been read. Other wise, the tsFileInput's position is not
changed.
+ *
+ * @param position the start position of data in the tsFileInput, or the
current position if
+ * position = -1
+ * @param totalSize the size of data that want to read
+ * @param ioSizeRecorder can be null
+ * @return data that been read.
+ */
+ protected ByteBuffer readData(long position, int totalSize, LongConsumer
ioSizeRecorder)
+ throws IOException {
+ if (ioSizeRecorder != null) {
+ ioSizeRecorder.accept(totalSize);
+ }
int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize);
int allocateNum = (int) Math.ceil((double) totalSize / allocateSize);
ByteBuffer buffer = ByteBuffer.allocate(totalSize);
@@ -1833,8 +1997,23 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @return data that been read.
*/
protected ByteBuffer readData(long start, long end) throws IOException {
+ return readData(start, end, null);
+ }
+
+ /**
+ * read data from tsFileInput, from the current position (if position = -1),
or the given
+ * position.
+ *
+ * @param start the start position of data in the tsFileInput, or the
current position if position
+ * = -1
+ * @param end the end position of data that want to read
+ * @param ioSizeRecorder can be null
+ * @return data that been read.
+ */
+ protected ByteBuffer readData(long start, long end, LongConsumer
ioSizeRecorder)
+ throws IOException {
try {
- return readData(start, (int) (end - start));
+ return readData(start, (int) (end - start), ioSizeRecorder);
} catch (StopReadTsFileByInterruptException e) {
throw e;
} catch (Throwable t) {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
index 817f0f46..53be3fec 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java
@@ -25,19 +25,23 @@ import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import java.io.IOException;
+import java.util.function.LongConsumer;
/** A class for reading unclosed tsfile. */
public class UnClosedTsFileReader extends TsFileSequenceReader {
private EncryptParameter encryptParam;
- public UnClosedTsFileReader(String file) throws IOException {
- super(file, false);
+ // ioSizeRecorder can be null
+ public UnClosedTsFileReader(String file, LongConsumer ioSizeRecorder) throws
IOException {
+ super(file, false, ioSizeRecorder);
encryptParam = EncryptUtils.encryptParam;
}
- public UnClosedTsFileReader(String file, EncryptParameter decryptParam)
throws IOException {
- super(file, false);
+ // ioSizeRecorder can be null
+ public UnClosedTsFileReader(
+ String file, EncryptParameter decryptParam, LongConsumer ioSizeRecorder)
throws IOException {
+ super(file, false, ioSizeRecorder);
this.encryptParam = encryptParam;
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
index bf284515..e4bfbb44 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java
@@ -73,7 +73,7 @@ public class TimeSeriesMetadataReadTest {
// s4 should not be returned as result
set.add("s4");
List<TimeseriesMetadata> timeseriesMetadataList =
- reader.readTimeseriesMetadata(path.getIDeviceID(),
path.getMeasurement(), set);
+ reader.readTimeseriesMetadata(path.getIDeviceID(),
path.getMeasurement(), set, false, null);
Assert.assertEquals(3, timeseriesMetadataList.size());
for (int i = 1; i <= timeseriesMetadataList.size(); i++) {
Assert.assertEquals("s" + i, timeseriesMetadataList.get(i -
1).getMeasurementId());
@@ -87,7 +87,7 @@ public class TimeSeriesMetadataReadTest {
// so the result is not supposed to contain this measurement's timeseries
metadata
set.add("s8");
timeseriesMetadataList =
- reader.readTimeseriesMetadata(path.getIDeviceID(),
path.getMeasurement(), set);
+ reader.readTimeseriesMetadata(path.getIDeviceID(),
path.getMeasurement(), set, false, null);
Assert.assertEquals(2, timeseriesMetadataList.size());
for (int i = 5; i < 7; i++) {
Assert.assertEquals("s" + i, timeseriesMetadataList.get(i -
5).getMeasurementId());
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
index 62dc9bda..8e865413 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java
@@ -57,7 +57,7 @@ public class UnClosedTsFileReaderTest {
ChunkMetadata chunkMetadata =
writer.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0);
- UnClosedTsFileReader reader = new
UnClosedTsFileReader(file.getAbsolutePath());
+ UnClosedTsFileReader reader = new
UnClosedTsFileReader(file.getAbsolutePath(), null);
Chunk chunk = reader.readMemChunk(chunkMetadata);
ChunkReader chunkReader = new ChunkReader(chunk);
BatchData batchData = chunkReader.nextPageData();