This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b40d22a67d1 [HUDI-8190] Implement efficient streaming reads for
HoodieDataBlocks (#11924)
b40d22a67d1 is described below
commit b40d22a67d1aa25d882b22b39ba858129f509427
Author: usberkeley <[email protected]>
AuthorDate: Mon Feb 10 10:35:15 2025 +0800
[HUDI-8190] Implement efficient streaming reads for HoodieDataBlocks
(#11924)
---
.../table/log/block/HoodieAvroDataBlock.java | 175 ++++++++++
.../common/table/log/block/HoodieDataBlock.java | 81 ++++-
.../table/log/block/HoodieHFileDataBlock.java | 18 +
.../common/table/log/block/HoodieLogBlock.java | 4 +
.../table/log/block/HoodieParquetDataBlock.java | 18 +
.../table/log/block/TestHoodieAvroDataBlock.java | 373 +++++++++++++++++++++
6 files changed, 666 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 488ffb1ce43..0a440919ab6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.io.SeekableDataInputStream;
@@ -52,6 +53,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -146,6 +148,26 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>)
new HoodieAvroIndexedRecord(data));
}
+ /**
+ * Streaming deserialization of records.
+ *
+ * @param inputStream The input stream from which to read the records.
+ * @param contentLocation The location within the input stream where the
content starts.
+ * @param bufferSize The size of the buffer to use for reading the records.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws IOException If there is an error reading or deserializing the
records.
+ */
+ @Override
+ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+ SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation,
+ HoodieRecordType type,
+ int bufferSize
+ ) throws IOException {
+ StreamingRecordIterator iterator =
StreamingRecordIterator.getInstance(this, inputStream, contentLocation,
bufferSize);
+ return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>)
new HoodieAvroIndexedRecord(data));
+ }
+
@Override
protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T>
readerContext, byte[] content) throws IOException {
checkState(this.readerSchema != null, "Reader's schema has to be
non-null");
@@ -221,6 +243,159 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
}
}
+ /**
+ * {@code StreamingRecordIterator} is an iterator for reading records from a
Hoodie log block in streaming manner.
+ * It decodes the given input stream into Avro records with optional schema
promotion.
+ *
+ * <p>This iterator ensures that the buffer has enough data for each record
and handles buffer setup,
+ * including compaction and resizing when necessary.
+ */
+ private static class StreamingRecordIterator implements
ClosableIterator<IndexedRecord> {
+ private static final int RECORD_LENGTH_BYTES = 4;
+ // The minimum buffer size in bytes
+ private static final int MIN_BUFFER_SIZE = RECORD_LENGTH_BYTES;
+ private final SeekableDataInputStream inputStream;
+ private final GenericDatumReader<IndexedRecord> reader;
+ private final ThreadLocal<BinaryDecoder> decoderCache = new
ThreadLocal<>();
+ private Option<Schema> promotedSchema = Option.empty();
+ private int totalRecords = 0;
+ private int readRecords = 0;
+ private ByteBuffer buffer;
+
+ private StreamingRecordIterator(Schema readerSchema, Schema writerSchema,
SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ // Negative values should not be used because they are generally
considered to indicate the operation of closing stream reading,
+ // in order to avoid confusing users into thinking that stream reading
can be closed.
+ checkArgument(bufferSize > 0, "Buffer size must be greater than zero");
+ bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
+
+ this.inputStream = inputStream;
+
+ // Seek to the start of the block
+ this.inputStream.seek(contentLocation.getContentPositionInLogFile());
+
+ // Read version for this data block
+ int version = this.inputStream.readInt();
+ if (new HoodieAvroDataBlockVersion(version).hasRecordCount()) {
+ this.totalRecords = this.inputStream.readInt();
+ }
+
+ if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema)) {
+ this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
+ this.promotedSchema = Option.of(readerSchema);
+ } else {
+ this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+ }
+
+ this.buffer = ByteBuffer.allocate(Math.min(bufferSize,
Math.toIntExact(contentLocation.getBlockSize())));
+ // The buffer defaults to read mode
+ this.buffer.flip();
+ }
+
+ public static StreamingRecordIterator getInstance(HoodieAvroDataBlock
dataBlock, SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation, int bufferSize) throws
IOException {
+ return new StreamingRecordIterator(dataBlock.readerSchema,
dataBlock.getSchemaFromHeader(), inputStream, contentLocation, bufferSize);
+ }
+
+ @Override
+ public void close() {
+ this.decoderCache.remove();
+ this.buffer = null;
+ try {
+ this.inputStream.close();
+ } catch (IOException ex) {
+ throw new HoodieIOException("Failed to close input stream", ex);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return readRecords < totalRecords;
+ }
+
+ @Override
+ public IndexedRecord next() {
+ try {
+ ensureBufferHasData(RECORD_LENGTH_BYTES);
+
+ // Read the record length
+ int recordLength = buffer.getInt();
+
+ // Ensure buffer is large enough and has enough data
+ ensureBufferHasData(recordLength);
+
+ // Decode the record
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(buffer.array(), buffer.position(),
recordLength, this.decoderCache.get());
+ this.decoderCache.set(decoder);
+ IndexedRecord record = this.reader.read(null, decoder);
+ buffer.position(buffer.position() + recordLength);
+ this.readRecords++;
+ if (this.promotedSchema.isPresent()) {
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
+ }
+ return record;
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to convert bytes to record", e);
+ }
+ }
+
+ /**
+ * Ensures that the buffer contains at least the specified amount of data.
+ *
+ * <p>This method checks if the buffer has the required amount of data. If
not, it attempts to fill the buffer
+ * by reading more data from the input stream. If the buffer's capacity is
insufficient, it allocates a larger buffer.
+ * If the end of the input stream is reached before the required amount of
data is available, an exception is thrown.
+ *
+ * @param dataLength the amount of data (in bytes) that must be available
in the buffer.
+ * @throws IOException if an I/O error occurs while reading from the input
stream.
+ * @throws HoodieException if the end of the input stream is reached
before the required amount of data is available.
+ */
+ private void ensureBufferHasData(int dataLength) throws IOException {
+ // Check if the current buffer has enough space to read the required
data length
+ if (buffer.capacity() - buffer.position() < dataLength) {
+ buffer.compact();
+ // Reset the buffer to read mode
+ buffer.flip();
+ }
+
+ // Check again if the buffer still doesn't have enough space after
compaction
+ if (buffer.capacity() - buffer.position() < dataLength) {
+ ByteBuffer newBuffer = ByteBuffer.allocate(buffer.position() +
dataLength);
+ newBuffer.put(buffer);
+ // Reset the new buffer to read mode
+ newBuffer.flip();
+ buffer = newBuffer;
+ }
+
+ while (buffer.remaining() < dataLength) {
+ boolean hasMoreData = fillBuffer();
+ if (!hasMoreData && buffer.remaining() < dataLength) {
+ throw new HoodieException("Unable to read enough data from the input
stream to fill the buffer");
+ }
+ }
+ }
+
+ /**
+ * Attempts to fill the buffer with more data from the input stream.
+ *
+ * <p>This method reads data from the input stream into the buffer,
starting at the current limit
+ * and reading up to the capacity of the buffer. If the end of the input
stream is reached,
+ * it returns false. Otherwise, it updates the buffer's limit to reflect
the new data and returns true.
+ *
+ * @return true if data was successfully read into the buffer; false if
the end of the input stream was reached.
+ * @throws IOException if an I/O error occurs while reading from the input
stream.
+ */
+ private boolean fillBuffer() throws IOException {
+ int bytesRead = inputStream.read(buffer.array(), buffer.limit(),
buffer.capacity() - buffer.limit());
+ if (bytesRead == -1) {
+ return false;
+ }
+
+ buffer.limit(buffer.limit() + bytesRead);
+ return true;
+ }
+ }
+
//----------------------------------------------------------------------------------------
// DEPRECATED METHODS
//
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 6f48cb27a74..c2a707c8303 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -142,16 +142,38 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
}
/**
- * Returns all the records iterator contained w/in this block.
+ * Returns an iterator over all the records contained within this block.
+ * This method uses a default buffer size of 0, which means it will read
+ * the entire Data Block content at once.
+ *
+ * @param type The type of HoodieRecord.
+ * @param <T> The type parameter for HoodieRecord.
+ * @return A ClosableIterator over HoodieRecord<T>.
*/
public final <T> ClosableIterator<HoodieRecord<T>>
getRecordIterator(HoodieRecordType type) {
+ return getRecordIterator(type, 0);
+ }
+
+ /**
+ * Returns an iterator over all the records contained within this block.
+ *
+ * @param type The type of HoodieRecord.
+ * @param bufferSize The size of the buffer for streaming read.
+ * A bufferSize less than or equal to 0 means that
streaming read is disabled and
+ * the entire block content will be read at once.
+ * A bufferSize greater than 0 enables streaming read with
the specified buffer size.
+ * @param <T> The type parameter for HoodieRecord.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws HoodieIOException If there is an error reading records from the
block payload.
+ */
+ public final <T> ClosableIterator<HoodieRecord<T>>
getRecordIterator(HoodieRecordType type, int bufferSize) {
if (records.isPresent()) {
// TODO need convert record type
return list2Iterator(unsafeCast(records.get()));
}
try {
// in case records are absent, read content lazily and then convert to
IndexedRecords
- return readRecordsFromBlockPayload(type);
+ return readRecordsFromBlockPayload(type, bufferSize);
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to
records", io);
}
@@ -170,6 +192,23 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* @throws IOException in case of failures encountered when reading/parsing
records
*/
public final <T> ClosableIterator<HoodieRecord<T>>
getRecordIterator(List<String> keys, boolean fullKey, HoodieRecordType type)
throws IOException {
+ return getRecordIterator(keys, fullKey, type, 0);
+ }
+
+ /**
+ * Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
+ * do a seek based parsing and return matched entries.
+ *
+ * @param keys keys of interest.
+ * @param bufferSize The size of the buffer for streaming read.
+ * A bufferSize less than or equal to 0 means that
streaming read is disabled and
+ * the entire block content will be read at once.
+ * A bufferSize greater than 0 enables streaming read with
the specified buffer size.
+ * @param <T> The type parameter for HoodieRecord.
+ * @return List of IndexedRecords for the keys of interest.
+ * @throws IOException in case of failures encountered when reading/parsing
records
+ */
+ public final <T> ClosableIterator<HoodieRecord<T>>
getRecordIterator(List<String> keys, boolean fullKey, HoodieRecordType type,
int bufferSize) throws IOException {
boolean fullScan = keys.isEmpty();
if (enablePointLookups && !fullScan) {
return lookupRecords(keys, fullKey);
@@ -177,7 +216,7 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
// Otherwise, we fetch all the records and filter out all the records, but
the
// ones requested
- ClosableIterator<HoodieRecord<T>> allRecords = getRecordIterator(type);
+ ClosableIterator<HoodieRecord<T>> allRecords = getRecordIterator(type,
bufferSize);
if (fullScan) {
return allRecords;
}
@@ -246,6 +285,26 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
}
}
+ /**
+ * Reads records from the block payload using a specified buffer size.
+ * This method attempts to read serialized records from the block payload,
leveraging a buffer size for streaming reads.
+ * If the buffer size is less than or equal to 0, it reads the entire block
content at once.
+ *
+ * @param bufferSize The size of the buffer for streaming read.
+ * A bufferSize less than or equal to 0 means that
streaming read is disabled and
+ * the entire block content will be read at once.
+ * A bufferSize greater than 0 enables streaming read with
the specified buffer size.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws IOException If there is an error reading or deserializing the
records.
+ */
+ protected <T> ClosableIterator<HoodieRecord<T>>
readRecordsFromBlockPayload(HoodieRecordType type, int bufferSize) throws
IOException {
+ if (getContent().isPresent() || bufferSize <= 0) {
+ return readRecordsFromBlockPayload(type);
+ }
+
+ return deserializeRecords(getInputStreamSupplier().get(),
getBlockContentLocation().get(), type, bufferSize);
+ }
+
protected <T> ClosableIterator<T>
readRecordsFromBlockPayload(HoodieReaderContext<T> readerContext) throws
IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
@@ -270,6 +329,22 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
protected abstract <T> ClosableIterator<HoodieRecord<T>>
deserializeRecords(byte[] content, HoodieRecordType type) throws IOException;
+ /**
+ * Streaming deserialization of records.
+ *
+ * @param inputStream The input stream from which to read the records.
+ * @param contentLocation The location within the input stream where the
content starts.
+ * @param bufferSize The size of the buffer to use for reading the records.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws IOException If there is an error reading or deserializing the
records.
+ */
+ protected abstract <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+ SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation,
+ HoodieRecordType type,
+ int bufferSize
+ ) throws IOException;
+
/**
* Deserializes the content bytes of the data block to the records in
engine-specific representation.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 4d1267701ba..ea474142463 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -125,6 +125,24 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
}
+ /**
+ * Streaming deserialization of records.
+ *
+ * @param inputStream The input stream from which to read the records.
+ * @param contentLocation The location within the input stream where the
content starts.
+ * @param bufferSize The size of the buffer to use for reading the records.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws IOException If there is an error reading or deserializing the
records.
+ */
+ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+ SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation,
+ HoodieRecordType type,
+ int bufferSize
+ ) throws IOException {
+ throw new UnsupportedOperationException("Streaming deserialization is not
supported for HoodieHFileDataBlock");
+ }
+
@Override
protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T>
readerContext, byte[] content) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 04c7827b4e4..78c39ff5faf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -345,6 +345,10 @@ public abstract class HoodieLogBlock {
return Option.of(content);
}
+ protected Supplier<SeekableDataInputStream> getInputStreamSupplier() {
+ return inputStreamSupplier;
+ }
+
/**
* Adds the record positions if the base file instant time of the positions
exists
* in the log header and the record positions are all valid.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 8d9b1221c54..35c0a0f8536 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -159,6 +159,24 @@ public class HoodieParquetDataBlock extends
HoodieDataBlock {
throw new UnsupportedOperationException("Should not be invoked");
}
+ /**
+ * Streaming deserialization of records.
+ *
+ * @param inputStream The input stream from which to read the records.
+ * @param contentLocation The location within the input stream where the
content starts.
+ * @param bufferSize The size of the buffer to use for reading the records.
+ * @return A ClosableIterator over HoodieRecord<T>.
+ * @throws IOException If there is an error reading or deserializing the
records.
+ */
+ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+ SeekableDataInputStream inputStream,
+ HoodieLogBlockContentLocation contentLocation,
+ HoodieRecordType type,
+ int bufferSize
+ ) throws IOException {
+ throw new UnsupportedOperationException("Should not be invoked");
+ }
+
@Override
protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T>
readerContext, byte[] content) throws IOException {
throw new UnsupportedOperationException("Should not be invoked");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
new file mode 100644
index 00000000000..98d918c1cb5
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestHoodieAvroDataBlock {
+ // Record key field of the test data
+ private static final String RECORD_KEY_FIELD =
HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
+ private static final String SCHEMA_STRING = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"RandomRecord\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"" + RECORD_KEY_FIELD + "\", \"type\":
\"string\"},\n"
+ + " {\"name\": \"value\", \"type\": \"double\"}\n"
+ + " ]\n"
+ + "}";
+
+ private static final Schema SCHEMA = new
Schema.Parser().parse(SCHEMA_STRING);
+
+ // Just to avoid errors when the HoodieAvroDataBlock constructor wraps
HoodieLogBlockContentLocation as an Option and encounters a null value
+ private static final HoodieLogBlockContentLocation
NULL_BLOCK_CONTENT_LOCATION = new HoodieLogBlockContentLocation(null, null, 0,
0, 0);
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock when records
are present.
+ *
+ * @param useKeyFilter Whether to use a key filter.
+ * @param keyFilterFullKeyMatch Whether the key filter should match the full
key.
+ * @throws IOException If an I/O error occurs during the test.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "true, true",
+ "true, false",
+ "false, true",
+ "false, false"
+ })
+ public void testGetRecordIteratorWithRecordsPresent(boolean useKeyFilter,
boolean keyFilterFullKeyMatch) throws IOException {
+ List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+ HoodieAvroDataBlock block = createHoodieAvroDataBlock(SCHEMA, records);
+
+ // Expect records for using key filter
+ List<HoodieRecord> recordsForFilter = selectRandomRecords(records,
keyFilterFullKeyMatch);
+ List<String> keysForFilter =
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+ // Get record iterator
+ try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+ ? block.getRecordIterator(keysForFilter, keyFilterFullKeyMatch,
HoodieRecord.HoodieRecordType.AVRO)
+ : block.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+ List<HoodieRecord> retrievedRecords = new ArrayList<>();
+ recordIterator.forEachRemaining(retrievedRecords::add);
+
+ verifyRecords(useKeyFilter ? recordsForFilter : records,
retrievedRecords);
+ }
+ }
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock using content
directly.
+ *
+ * @param useKeyFilter Whether to use a key filter.
+ * @param keyFilterFullKeyMatch Whether the key filter should match the full
key.
+ * @throws IOException If an I/O error occurs during the test.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "true, true",
+ "true, false",
+ "false, true",
+ "false, false"
+ })
+ public void testGetRecordIteratorWithContent(boolean useKeyFilter, boolean
keyFilterFullKeyMatch) throws IOException {
+ List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+ byte[] content = createHoodieAvroDataBlockContent(SCHEMA, records);
+ Option<byte[]> contentOpt = Option.of(content);
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+ // Expect records for using key filter
+ List<HoodieRecord> recordsForFilter = selectRandomRecords(records,
keyFilterFullKeyMatch);
+ List<String> keysForFilter =
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+ // With log block content
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> null,
contentOpt, false,
+ NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new
HashMap<>(), RECORD_KEY_FIELD);
+
+ // Get record iterator
+ try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+ ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch,
HoodieRecord.HoodieRecordType.AVRO)
+ : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+ List<HoodieRecord> retrievedRecords = new ArrayList<>();
+ recordIterator.forEachRemaining(retrievedRecords::add);
+
+ verifyRecords(useKeyFilter ? recordsForFilter : records,
retrievedRecords);
+ }
+ }
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock with input
stream (read block lazily).
+ *
+ * @param useStreamingRead Whether to use streaming read.
+ * @param useKeyFilter Whether to use a key filter.
+ * @param keyFilterFullKeyMatch Whether the key filter should match the full
key.
+ * @throws IOException If an I/O error occurs during the test.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "true, true, true",
+ "true, true, false",
+ "true, false, true",
+ "true, false, false",
+ "false, true, true",
+ "false, true, false",
+ "false, false, true",
+ "false, false, false"
+ })
+ public void testGetRecordIteratorWithInputStream(boolean useStreamingRead,
boolean useKeyFilter, boolean keyFilterFullKeyMatch) throws IOException {
+ List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+ byte[] blockContent = createHoodieAvroDataBlockContent(SCHEMA, records);
+ SeekableDataInputStream inputStream =
createSeekableDataInputStream(blockContent);
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+ HoodieLogBlockContentLocation logBlockContentLocation = new
HoodieLogBlockContentLocation(null, null, 0, blockContent.length,
blockContent.length);
+ int bufferSize = useStreamingRead ? 100 : 0; // bytes
+
+ // Expect records for using key filter
+ List<HoodieRecord> recordsForFilter = selectRandomRecords(records,
keyFilterFullKeyMatch);
+ List<String> keysForFilter =
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+ // With input stream
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream,
Option.empty(), true,
+ logBlockContentLocation, Option.of(SCHEMA), header, new HashMap<>(),
RECORD_KEY_FIELD);
+
+ // Get record iterator
+ try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+ ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch,
HoodieRecord.HoodieRecordType.AVRO, bufferSize)
+ : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO,
bufferSize)) {
+ List<HoodieRecord> retrievedRecords = new ArrayList<>();
+ recordIterator.forEachRemaining(retrievedRecords::add);
+
+ verifyRecords(useKeyFilter ? recordsForFilter : records,
retrievedRecords);
+ }
+ }
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock with streaming
read.
+ *
+ * @param bufferSize The size of the buffer to be used for the streaming
read.
+ * @throws IOException If an I/O error occurs during the test.
+ */
+ @ParameterizedTest
+ @ValueSource(ints = {Integer.MIN_VALUE, -2, -1, 0, 1, 2, 100, 200, 300,
1024, Integer.MAX_VALUE})
+ public void testGetRecordIteratorWithStreamingRead(int bufferSize) throws
IOException {
+ List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+ byte[] blockContent = createHoodieAvroDataBlockContent(SCHEMA, records);
+ SeekableDataInputStream inputStream =
createSeekableDataInputStream(blockContent);
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+ HoodieLogBlockContentLocation logBlockContentLocation = new
HoodieLogBlockContentLocation(null, null, 0, blockContent.length,
blockContent.length);
+
+ // Create an instance of HoodieAvroDataBlock with streaming read
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream,
Option.empty(), true,
+ logBlockContentLocation, Option.of(SCHEMA), header, new HashMap<>(),
RECORD_KEY_FIELD);
+
+ // Get record iterator
+ try (ClosableIterator<HoodieRecord<Object>> recordIterator =
dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, bufferSize)) {
+ List<HoodieRecord> retrievedRecords = new ArrayList<>();
+ recordIterator.forEachRemaining(retrievedRecords::add);
+
+ verifyRecords(records, retrievedRecords);
+ }
+ }
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock with empty
content.
+ *
+ */
+ @Test
+ public void testGetRecordIteratorWithEmptyContent() {
+ byte[] content = new byte[0]; // Simulate empty content bytes
+ Option<byte[]> contentOption = Option.of(content);
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+ // With content
+ HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null,
contentOption, false,
+ NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new
HashMap<>(), RECORD_KEY_FIELD);
+
+ // Mock the behavior of deserializeRecords to throw IOException
+ assertThrows(EOFException.class, () ->
avroDataBlock.deserializeRecords(content, HoodieRecord.HoodieRecordType.AVRO));
+
+ // Call getRecordIterator and verify the behavior
+ assertThrows(HoodieIOException.class, () ->
avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO));
+ }
+
+ /**
+ * Tests the getRecordIterator method of HoodieAvroDataBlock with empty
content and input stream.
+ *
+ */
+ @ParameterizedTest
+ @ValueSource(ints = {Integer.MIN_VALUE, -2, -1, 0, 1, 2, 100, 200, 300,
1024, Integer.MAX_VALUE})
+ public void testGetRecordIteratorWithEmptyContentAndInputStream(int
bufferSize) throws IOException {
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+ // empty input stream supplier and empty content
+ HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null,
Option.empty(), true,
+ NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new
HashMap<>(), RECORD_KEY_FIELD);
+
+ assertThrows(NullPointerException.class, () ->
avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO,
bufferSize));
+ }
+
+ /**
+ * Generates a list of random HoodieRecord objects based on the provided
Avro schema.
+ *
+ * @param schema The Avro schema to use for generating the records.
+ * @param recordCount The number of records to generate.
+ * @return A list of HoodieRecord objects.
+ */
+ private static List<HoodieRecord> generateRandomHoodieRecords(Schema schema,
int recordCount) {
+ List<HoodieRecord> records = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < recordCount; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ String recordKey = "key_" + i;
+ record.put("_hoodie_record_key", "key_" + i);
+ record.put("value", random.nextDouble());
+ records.add(new HoodieAvroIndexedRecord(new HoodieKey(recordKey, ""),
record));
+ }
+
+ return records;
+ }
+
+ /**
+ * Creates a HoodieAvroDataBlock object from the provided schema and list of
HoodieRecord objects.
+ *
+ * @param schema The Avro schema used to define the structure of the
records.
+ * @param records The list of HoodieRecord objects to be included in the
data block.
+ * @return A HoodieAvroDataBlock object containing the provided records and
schema.
+ */
+ private static HoodieAvroDataBlock createHoodieAvroDataBlock(Schema schema,
List<HoodieRecord> records) {
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, schema.toString());
+
+ return new HoodieAvroDataBlock(records, header, RECORD_KEY_FIELD);
+ }
+
+ /**
+ * Creates the content bytes of a HoodieAvroDataBlock from the provided
schema and list of HoodieRecord objects.
+ *
+ * @param schema The Avro schema used to define the structure of the
records.
+ * @param records The list of HoodieRecord objects to be included in the
data block.
+ * @return A byte array representing the content of the HoodieAvroDataBlock.
+ * @throws IOException If an I/O error occurs while generating the content
bytes.
+ */
+ private static byte[] createHoodieAvroDataBlockContent(Schema schema,
List<HoodieRecord> records) throws IOException {
+ Map<HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HeaderMetadataType.SCHEMA, schema.toString());
+
+ return new HoodieAvroDataBlock(records, header,
RECORD_KEY_FIELD).getContentBytes(null);
+ }
+
+ /**
+ * Creates a SeekableDataInputStream from the provided byte array content.
+ *
+ * @param content The byte array containing the data to be read.
+ * @return A SeekableDataInputStream that allows seeking within the provided
byte array content.
+ * @throws IOException If an I/O error occurs while creating the
SeekableDataInputStream.
+ */
+ private static SeekableDataInputStream createSeekableDataInputStream(byte[]
content) throws IOException {
+ return new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(content));
+ }
+
+ /**
+ * Selects a random subset of HoodieRecord objects from the provided list.
+ *
+ * @param records The list of HoodieRecord objects to sample from.
+ * @param fullKey A boolean flag indicating whether to match records by full
key or by prefix.
+ * If true, records are matched by full key; if false,
records are matched by key prefix.
+ * @return A list of randomly selected HoodieRecord objects.
+ */
+ private static List<HoodieRecord> selectRandomRecords(List<HoodieRecord>
records, boolean fullKey) {
+ // number of sampled records
+ int count = new Random().nextInt(records.size());
+ // set of record keys of the sampled records
+ Set<String> keys = new Random()
+ .ints(count, 0, records.size())
+ .mapToObj(records::get)
+ .map(r -> r.getRecordKey(SCHEMA, RECORD_KEY_FIELD))
+ .collect(Collectors.toSet());
+
+ // simulate KeyFilter matching logic
+ return records.stream()
+ .filter(r -> fullKey
+ ? keys.contains(r.getRecordKey(SCHEMA, RECORD_KEY_FIELD))
+ : keys.stream().anyMatch(r.getRecordKey(SCHEMA,
RECORD_KEY_FIELD)::startsWith))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Verifies that the actual records match the expected records.
+ * This method converts the HoodieRecord objects to GenericRecord objects,
sorts them,
+ * and then compares the sorted lists for equality.
+ *
+ * @param expectedRecords The list of expected HoodieRecord objects.
+ * @param actualRecords The list of actual HoodieRecord objects to verify.
+ * @throws AssertionError If the record counts do not match or if any record
data mismatches.
+ */
+ private static void verifyRecords(List<HoodieRecord> expectedRecords,
List<HoodieRecord> actualRecords) {
+ List<GenericRecord> expectedGenericRecords =
convertAndSortRecords(expectedRecords);
+ List<GenericRecord> actualGenericRecords =
convertAndSortRecords(actualRecords);
+
+ assertEquals(expectedGenericRecords.size(), actualGenericRecords.size(),
"Record count mismatch");
+ for (int i = 0; i < expectedGenericRecords.size(); i++) {
+ assertEquals(expectedGenericRecords, actualGenericRecords, "Record data
mismatch");
+ }
+ }
+
+ private static List<GenericRecord> convertAndSortRecords(List<HoodieRecord>
hoodieRecords) {
+ return hoodieRecords.stream()
+ .map(HoodieRecord::getData)
+ .map(r -> (GenericRecord) r)
+ .sorted(Comparator.comparing(r ->
r.get(RECORD_KEY_FIELD).toString()))
+ .collect(Collectors.toList());
+ }
+}