This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 4447ef785da027a579401e10e8e9f8467909a1fb Author: shuwenwei <[email protected]> AuthorDate: Tue May 28 10:39:18 2024 +0800 Add some methods for compaction --- .../apache/tsfile/read/TsFileSequenceReader.java | 42 +++++++++++++++++----- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index a15c9890..882b0451 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -1335,6 +1335,8 @@ public class TsFileSequenceReader implements AutoCloseable { } else { return metadataIndex.getChildIndexEntry(deviceID, exactSearch); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while deserializing MetadataIndex of file {}", file); throw e; @@ -2165,7 +2167,6 @@ public class TsFileSequenceReader implements AutoCloseable { } ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); MetadataIndexNode metadataIndexNode; - TimeseriesMetadata firstTimeseriesMetadata; try { // next layer MeasurementNode of the specific DeviceNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer, false); @@ -2173,7 +2174,19 @@ public class TsFileSequenceReader implements AutoCloseable { logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); throw e; } - firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode); + return getAlignedChunkMetadataByMetadataIndexNode(device, metadataIndexNode); + } + + /** + * Get AlignedChunkMetadata of sensors under one device. Notice: if all the value chunks is empty + * chunk, then return empty list. + * + * @param device device name + * @param metadataIndexNode the first measurement metadata index node of the device + */ + public List<AlignedChunkMetadata> getAlignedChunkMetadataByMetadataIndexNode( + IDeviceID device, MetadataIndexNode metadataIndexNode) throws IOException { + TimeseriesMetadata firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode); if (firstTimeseriesMetadata == null) { throw new IOException("Timeseries of device {" + device + "} are not aligned"); } @@ -2187,7 +2200,7 @@ public class TsFileSequenceReader implements AutoCloseable { if (i != metadataIndexEntryList.size() - 1) { endOffset = metadataIndexEntryList.get(i + 1).getOffset(); } - buffer = readData(metadataIndexEntry.getOffset(), endOffset); + ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); while (buffer.hasRemaining()) { @@ -2407,9 +2420,22 @@ public class TsFileSequenceReader implements AutoCloseable { }; } - Queue<Pair<Long, Long>> queue = new LinkedList<>(); ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); - collectEachLeafMeasurementNodeOffsetRange(buffer, queue); + MetadataIndexNode firstMeasurementNode = MetadataIndexNode.deserializeFrom(buffer, false); + return getMeasurementChunkMetadataListMapIterator(firstMeasurementNode); + } + + /** + * @return An iterator of linked hashmaps ( measurement -> chunk metadata list ). When traversing + * the linked hashmap, you will get chunk metadata lists according to the lexicographic order + * of the measurements. The first measurement of the linked hashmap of each iteration is + * always larger than the last measurement of the linked hashmap of the previous iteration in + * lexicographic order. + */ + public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator( + MetadataIndexNode firstMeasurementMetadataIndexNodeOfDevice) throws IOException { + Queue<Pair<Long, Long>> queue = new LinkedList<>(); + collectEachLeafMeasurementNodeOffsetRange(firstMeasurementMetadataIndexNodeOfDevice, queue); return new Iterator<Map<String, List<ChunkMetadata>>>() { @@ -2450,9 +2476,8 @@ public class TsFileSequenceReader implements AutoCloseable { } private void collectEachLeafMeasurementNodeOffsetRange( - ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException { + MetadataIndexNode metadataIndexNode, Queue<Pair<Long, Long>> queue) throws IOException { try { - final MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer, false); final MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType(); final int metadataIndexListSize = metadataIndexNode.getChildren().size(); for (int i = 0; i < metadataIndexListSize; ++i) { @@ -2465,7 +2490,8 @@ public class TsFileSequenceReader implements AutoCloseable { queue.add(new Pair<>(startOffset, endOffset)); continue; } - collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue); + collectEachLeafMeasurementNodeOffsetRange( + MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset), false), queue); } } catch (StopReadTsFileByInterruptException e) { throw e;
