danny0405 commented on code in PR #11924:
URL: https://github.com/apache/hudi/pull/11924#discussion_r1881355144
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -224,6 +249,159 @@ public IndexedRecord next() {
}
}
+ /**
+ * StreamingRecordIterator is an iterator for reading records from a Hoodie
log block in a streaming manner.
+ * It reads data from a given input stream, decodes Avro records, and
supports schema promotion.
+ *
+ * This iterator ensures that the buffer has enough data for each record and
handles buffer management,
+ * including compaction and resizing when necessary.
+ */
+ private static class StreamingRecordIterator implements
ClosableIterator<IndexedRecord> {
+ private static final int RECORD_LENGTH_BYTES = 4;
+ // The minimum buffer size in bytes
+ private static final int MIN_BUFFER_SIZE = RECORD_LENGTH_BYTES;
+ private final SeekableDataInputStream inputStream;
+ private final GenericDatumReader<IndexedRecord> reader;
+ private final ThreadLocal<BinaryDecoder> decoderCache = new
ThreadLocal<>();
+ private Option<Schema> promotedSchema = Option.empty();
+ private int totalRecords = 0;
+ private int readRecords = 0;
+ private ByteBuffer buffer;
+
+ private StreamingRecordIterator(Schema readerSchema, Schema writerSchema,
SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ // Negative values should not be used because they are generally
considered to indicate the operation of closing stream reading,
+ // in order to avoid confusing users into thinking that stream reading
can be closed.
+ checkArgument(bufferSize > 0, "Buffer size must be greater than zero");
+ bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
+
+ this.inputStream = inputStream;
+
+ // Seek to the start of the block
+ this.inputStream.seek(contentLocation.getContentPositionInLogFile());
+
+ // Read version for this data block
+ int version = this.inputStream.readInt();
+ if (new HoodieAvroDataBlockVersion(version).hasRecordCount()) {
+ this.totalRecords = this.inputStream.readInt();
+ }
+
+ if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema)) {
+ this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
+ this.promotedSchema = Option.of(readerSchema);
+ } else {
+ this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+ }
+
+ this.buffer = ByteBuffer.allocate(Math.min(bufferSize,
Math.toIntExact(contentLocation.getBlockSize())));
+ // The buffer defaults to read mode
+ this.buffer.flip();
+ }
+
+ public static StreamingRecordIterator getInstance(HoodieAvroDataBlock
dataBlock, SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ return new StreamingRecordIterator(dataBlock.readerSchema,
dataBlock.getSchemaFromHeader(), inputStream, contentLocation, bufferSize);
+ }
+
+ @Override
+ public void close() {
+ this.decoderCache.remove();
+ this.buffer = null;
+ try {
+ this.inputStream.close();
+ } catch (IOException ex) {
+ throw new HoodieIOException("Failed to close input stream", ex);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return readRecords < totalRecords;
+ }
+
+ @Override
+ public IndexedRecord next() {
+ try {
+ ensureBufferHasData(RECORD_LENGTH_BYTES);
+
+ // Read the record length
+ int recordLength = buffer.getInt();
+
+ // Ensure buffer is large enough and has enough data
+ ensureBufferHasData(recordLength);
+
+ // Decode the record
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(buffer.array(), buffer.position(),
recordLength, this.decoderCache.get());
+ this.decoderCache.set(decoder);
+ IndexedRecord record = this.reader.read(null, decoder);
+ buffer.position(buffer.position() + recordLength);
+ this.readRecords++;
+ if (this.promotedSchema.isPresent()) {
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to convert bytes to record", e);
+ }
+ }
+
+ /**
+ * Ensures that the buffer contains at least the specified amount of data.
+ *
+ * This method checks if the buffer has the required amount of data. If
not, it attempts to fill the buffer
Review Comment:
missing <p> tag at the front.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -224,6 +249,159 @@ public IndexedRecord next() {
}
}
+ /**
+ * StreamingRecordIterator is an iterator for reading records from a Hoodie
log block in a streaming manner.
+ * It reads data from a given input stream, decodes Avro records, and
supports schema promotion.
+ *
+ * This iterator ensures that the buffer has enough data for each record and
handles buffer management,
+ * including compaction and resizing when necessary.
+ */
+ private static class StreamingRecordIterator implements
ClosableIterator<IndexedRecord> {
+ private static final int RECORD_LENGTH_BYTES = 4;
+ // The minimum buffer size in bytes
+ private static final int MIN_BUFFER_SIZE = RECORD_LENGTH_BYTES;
+ private final SeekableDataInputStream inputStream;
+ private final GenericDatumReader<IndexedRecord> reader;
+ private final ThreadLocal<BinaryDecoder> decoderCache = new
ThreadLocal<>();
+ private Option<Schema> promotedSchema = Option.empty();
+ private int totalRecords = 0;
+ private int readRecords = 0;
+ private ByteBuffer buffer;
+
+ private StreamingRecordIterator(Schema readerSchema, Schema writerSchema,
SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ // Negative values should not be used because they are generally
considered to indicate the operation of closing stream reading,
+ // in order to avoid confusing users into thinking that stream reading
can be closed.
+ checkArgument(bufferSize > 0, "Buffer size must be greater than zero");
+ bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
+
+ this.inputStream = inputStream;
+
+ // Seek to the start of the block
+ this.inputStream.seek(contentLocation.getContentPositionInLogFile());
+
+ // Read version for this data block
+ int version = this.inputStream.readInt();
+ if (new HoodieAvroDataBlockVersion(version).hasRecordCount()) {
+ this.totalRecords = this.inputStream.readInt();
+ }
+
+ if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema)) {
+ this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
+ this.promotedSchema = Option.of(readerSchema);
+ } else {
+ this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+ }
+
+ this.buffer = ByteBuffer.allocate(Math.min(bufferSize,
Math.toIntExact(contentLocation.getBlockSize())));
+ // The buffer defaults to read mode
+ this.buffer.flip();
+ }
+
+ public static StreamingRecordIterator getInstance(HoodieAvroDataBlock
dataBlock, SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ return new StreamingRecordIterator(dataBlock.readerSchema,
dataBlock.getSchemaFromHeader(), inputStream, contentLocation, bufferSize);
+ }
+
+ @Override
+ public void close() {
+ this.decoderCache.remove();
+ this.buffer = null;
+ try {
+ this.inputStream.close();
+ } catch (IOException ex) {
+ throw new HoodieIOException("Failed to close input stream", ex);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return readRecords < totalRecords;
+ }
+
+ @Override
+ public IndexedRecord next() {
+ try {
+ ensureBufferHasData(RECORD_LENGTH_BYTES);
+
+ // Read the record length
+ int recordLength = buffer.getInt();
+
+ // Ensure buffer is large enough and has enough data
+ ensureBufferHasData(recordLength);
+
+ // Decode the record
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(buffer.array(), buffer.position(),
recordLength, this.decoderCache.get());
+ this.decoderCache.set(decoder);
+ IndexedRecord record = this.reader.read(null, decoder);
+ buffer.position(buffer.position() + recordLength);
+ this.readRecords++;
+ if (this.promotedSchema.isPresent()) {
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to convert bytes to record", e);
+ }
+ }
+
+ /**
+ * Ensures that the buffer contains at least the specified amount of data.
+ *
+ * This method checks if the buffer has the required amount of data. If
not, it attempts to fill the buffer
Review Comment:
missing `<p>` tag at the front.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]