This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b40d22a67d1 [HUDI-8190] Implement efficient streaming reads for 
HoodieDataBlocks (#11924)
b40d22a67d1 is described below

commit b40d22a67d1aa25d882b22b39ba858129f509427
Author: usberkeley <[email protected]>
AuthorDate: Mon Feb 10 10:35:15 2025 +0800

    [HUDI-8190] Implement efficient streaming reads for HoodieDataBlocks 
(#11924)
---
 .../table/log/block/HoodieAvroDataBlock.java       | 175 ++++++++++
 .../common/table/log/block/HoodieDataBlock.java    |  81 ++++-
 .../table/log/block/HoodieHFileDataBlock.java      |  18 +
 .../common/table/log/block/HoodieLogBlock.java     |   4 +
 .../table/log/block/HoodieParquetDataBlock.java    |  18 +
 .../table/log/block/TestHoodieAvroDataBlock.java   | 373 +++++++++++++++++++++
 6 files changed, 666 insertions(+), 3 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 488ffb1ce43..0a440919ab6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.io.SeekableDataInputStream;
@@ -52,6 +53,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -146,6 +148,26 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
     return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>) 
new HoodieAvroIndexedRecord(data));
   }
 
+  /**
+   * Streaming deserialization of records.
+   *
+   * @param inputStream The input stream from which to read the records.
+   * @param contentLocation The location within the input stream where the 
content starts.
+   * @param bufferSize The size of the buffer to use for reading the records.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws IOException If there is an error reading or deserializing the 
records.
+   */
+  @Override
+  protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+          SeekableDataInputStream inputStream,
+          HoodieLogBlockContentLocation contentLocation,
+          HoodieRecordType type,
+          int bufferSize
+  ) throws IOException {
+    StreamingRecordIterator iterator = 
StreamingRecordIterator.getInstance(this, inputStream, contentLocation, 
bufferSize);
+    return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>) 
new HoodieAvroIndexedRecord(data));
+  }
+
   @Override
   protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T> 
readerContext, byte[] content) throws IOException {
     checkState(this.readerSchema != null, "Reader's schema has to be 
non-null");
@@ -221,6 +243,159 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
     }
   }
 
+  /**
+   * {@code StreamingRecordIterator} is an iterator for reading records from a 
Hoodie log block in streaming manner.
+   * It decodes the given input stream into Avro records with optional schema 
promotion.
+   *
+   * <p>This iterator ensures that the buffer has enough data for each record 
and handles buffer setup,
+   * including compaction and resizing when necessary.
+   */
+  private static class StreamingRecordIterator implements 
ClosableIterator<IndexedRecord> {
+    private static final int RECORD_LENGTH_BYTES = 4;
+    // The minimum buffer size in bytes
+    private static final int MIN_BUFFER_SIZE = RECORD_LENGTH_BYTES;
+    private final SeekableDataInputStream inputStream;
+    private final GenericDatumReader<IndexedRecord> reader;
+    private final ThreadLocal<BinaryDecoder> decoderCache = new 
ThreadLocal<>();
+    private Option<Schema> promotedSchema = Option.empty();
+    private int totalRecords = 0;
+    private int readRecords = 0;
+    private ByteBuffer buffer;
+
+    private StreamingRecordIterator(Schema readerSchema, Schema writerSchema, 
SeekableDataInputStream inputStream,
+        HoodieLogBlockContentLocation contentLocation, int bufferSize) throws 
IOException {
+      // Negative values should not be used because they are generally 
considered to indicate the operation of closing stream reading,
+      // in order to avoid confusing users into thinking that stream reading 
can be closed.
+      checkArgument(bufferSize > 0, "Buffer size must be greater than zero");
+      bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
+
+      this.inputStream = inputStream;
+
+      // Seek to the start of the block
+      this.inputStream.seek(contentLocation.getContentPositionInLogFile());
+
+      // Read version for this data block
+      int version = this.inputStream.readInt();
+      if (new HoodieAvroDataBlockVersion(version).hasRecordCount()) {
+        this.totalRecords = this.inputStream.readInt();
+      }
+
+      if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, 
readerSchema)) {
+        this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
+        this.promotedSchema = Option.of(readerSchema);
+      } else {
+        this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+      }
+
+      this.buffer = ByteBuffer.allocate(Math.min(bufferSize, 
Math.toIntExact(contentLocation.getBlockSize())));
+      // The buffer defaults to read mode
+      this.buffer.flip();
+    }
+
+    public static StreamingRecordIterator getInstance(HoodieAvroDataBlock 
dataBlock, SeekableDataInputStream inputStream,
+        HoodieLogBlockContentLocation contentLocation, int bufferSize) throws 
IOException {
+      return new StreamingRecordIterator(dataBlock.readerSchema, 
dataBlock.getSchemaFromHeader(), inputStream, contentLocation, bufferSize);
+    }
+
+    @Override
+    public void close() {
+      this.decoderCache.remove();
+      this.buffer = null;
+      try {
+        this.inputStream.close();
+      } catch (IOException ex) {
+        throw new HoodieIOException("Failed to close input stream", ex);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return readRecords < totalRecords;
+    }
+
+    @Override
+    public IndexedRecord next() {
+      try {
+        ensureBufferHasData(RECORD_LENGTH_BYTES);
+
+        // Read the record length
+        int recordLength = buffer.getInt();
+
+        // Ensure buffer is large enough and has enough data
+        ensureBufferHasData(recordLength);
+
+        // Decode the record
+        BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(buffer.array(), buffer.position(), 
recordLength, this.decoderCache.get());
+        this.decoderCache.set(decoder);
+        IndexedRecord record = this.reader.read(null, decoder);
+        buffer.position(buffer.position() + recordLength);
+        this.readRecords++;
+        if (this.promotedSchema.isPresent()) {
+          return HoodieAvroUtils.rewriteRecordWithNewSchema(record, 
this.promotedSchema.get());
+        }
+        return record;
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to convert bytes to record", e);
+      }
+    }
+
+    /**
+     * Ensures that the buffer contains at least the specified amount of data.
+     *
+     * <p>This method checks if the buffer has the required amount of data. If 
not, it attempts to fill the buffer
+     * by reading more data from the input stream. If the buffer's capacity is 
insufficient, it allocates a larger buffer.
+     * If the end of the input stream is reached before the required amount of 
data is available, an exception is thrown.
+     *
+     * @param dataLength the amount of data (in bytes) that must be available 
in the buffer.
+     * @throws IOException if an I/O error occurs while reading from the input 
stream.
+     * @throws HoodieException if the end of the input stream is reached 
before the required amount of data is available.
+     */
+    private void ensureBufferHasData(int dataLength) throws IOException {
+      // Check if the current buffer has enough space to read the required 
data length
+      if (buffer.capacity() - buffer.position() < dataLength) {
+        buffer.compact();
+        // Reset the buffer to read mode
+        buffer.flip();
+      }
+
+      // Check again if the buffer still doesn't have enough space after 
compaction
+      if (buffer.capacity() - buffer.position() < dataLength) {
+        ByteBuffer newBuffer = ByteBuffer.allocate(buffer.position() + 
dataLength);
+        newBuffer.put(buffer);
+        // Reset the new buffer to read mode
+        newBuffer.flip();
+        buffer = newBuffer;
+      }
+
+      while (buffer.remaining() < dataLength) {
+        boolean hasMoreData = fillBuffer();
+        if (!hasMoreData && buffer.remaining() < dataLength) {
+          throw new HoodieException("Unable to read enough data from the input 
stream to fill the buffer");
+        }
+      }
+    }
+
+    /**
+     * Attempts to fill the buffer with more data from the input stream.
+     *
+     * <p>This method reads data from the input stream into the buffer, 
starting at the current limit
+     * and reading up to the capacity of the buffer. If the end of the input 
stream is reached,
+     * it returns false. Otherwise, it updates the buffer's limit to reflect 
the new data and returns true.
+     *
+     * @return true if data was successfully read into the buffer; false if 
the end of the input stream was reached.
+     * @throws IOException if an I/O error occurs while reading from the input 
stream.
+     */
+    private boolean fillBuffer() throws IOException {
+      int bytesRead = inputStream.read(buffer.array(), buffer.limit(), 
buffer.capacity() - buffer.limit());
+      if (bytesRead == -1) {
+        return false;
+      }
+
+      buffer.limit(buffer.limit() + bytesRead);
+      return true;
+    }
+  }
+
   
//----------------------------------------------------------------------------------------
   //                                  DEPRECATED METHODS
   //
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 6f48cb27a74..c2a707c8303 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -142,16 +142,38 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
   }
 
   /**
-   * Returns all the records iterator contained w/in this block.
+   * Returns an iterator over all the records contained within this block.
+   * This method uses a default buffer size of 0, which means it will read
+   * the entire Data Block content at once.
+   *
+   * @param type The type of HoodieRecord.
+   * @param <T>  The type parameter for HoodieRecord.
+   * @return A ClosableIterator over HoodieRecord<T>.
    */
   public final <T> ClosableIterator<HoodieRecord<T>> 
getRecordIterator(HoodieRecordType type) {
+    return getRecordIterator(type, 0);
+  }
+
+  /**
+   * Returns an iterator over all the records contained within this block.
+   *
+   * @param type       The type of HoodieRecord.
+   * @param bufferSize The size of the buffer for streaming read.
+   *                   A bufferSize less than or equal to 0 means that 
streaming read is disabled and
+   *                   the entire block content will be read at once.
+   *                   A bufferSize greater than 0 enables streaming read with 
the specified buffer size.
+   * @param <T>        The type parameter for HoodieRecord.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws HoodieIOException If there is an error reading records from the 
block payload.
+   */
+  public final <T> ClosableIterator<HoodieRecord<T>> 
getRecordIterator(HoodieRecordType type, int bufferSize) {
     if (records.isPresent()) {
       // TODO need convert record type
       return list2Iterator(unsafeCast(records.get()));
     }
     try {
       // in case records are absent, read content lazily and then convert to 
IndexedRecords
-      return readRecordsFromBlockPayload(type);
+      return readRecordsFromBlockPayload(type, bufferSize);
     } catch (IOException io) {
       throw new HoodieIOException("Unable to convert content bytes to 
records", io);
     }
@@ -170,6 +192,23 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
    * @throws IOException in case of failures encountered when reading/parsing 
records
    */
   public final <T> ClosableIterator<HoodieRecord<T>> 
getRecordIterator(List<String> keys, boolean fullKey, HoodieRecordType type) 
throws IOException {
+    return getRecordIterator(keys, fullKey, type, 0);
+  }
+
+  /**
+   * Batch get of keys of interest. Implementation can choose to either do 
full scan and return matched entries or
+   * do a seek based parsing and return matched entries.
+   *
+   * @param keys keys of interest.
+   * @param bufferSize The size of the buffer for streaming read.
+   *                   A bufferSize less than or equal to 0 means that 
streaming read is disabled and
+   *                   the entire block content will be read at once.
+   *                   A bufferSize greater than 0 enables streaming read with 
the specified buffer size.
+   * @param <T>        The type parameter for HoodieRecord.
+   * @return List of IndexedRecords for the keys of interest.
+   * @throws IOException in case of failures encountered when reading/parsing 
records
+   */
+  public final <T> ClosableIterator<HoodieRecord<T>> 
getRecordIterator(List<String> keys, boolean fullKey, HoodieRecordType type, 
int bufferSize) throws IOException {
     boolean fullScan = keys.isEmpty();
     if (enablePointLookups && !fullScan) {
       return lookupRecords(keys, fullKey);
@@ -177,7 +216,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
 
     // Otherwise, we fetch all the records and filter out all the records, but 
the
     // ones requested
-    ClosableIterator<HoodieRecord<T>> allRecords = getRecordIterator(type);
+    ClosableIterator<HoodieRecord<T>> allRecords = getRecordIterator(type, 
bufferSize);
     if (fullScan) {
       return allRecords;
     }
@@ -246,6 +285,26 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     }
   }
 
+  /**
+   * Reads records from the block payload using a specified buffer size.
+   * This method attempts to read serialized records from the block payload, 
leveraging a buffer size for streaming reads.
+   * If the buffer size is less than or equal to 0, it reads the entire block 
content at once.
+   *
+   * @param bufferSize The size of the buffer for streaming read.
+   *                   A bufferSize less than or equal to 0 means that 
streaming read is disabled and
+   *                   the entire block content will be read at once.
+   *                   A bufferSize greater than 0 enables streaming read with 
the specified buffer size.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws IOException If there is an error reading or deserializing the 
records.
+   */
+  protected <T> ClosableIterator<HoodieRecord<T>> 
readRecordsFromBlockPayload(HoodieRecordType type, int bufferSize) throws 
IOException {
+    if (getContent().isPresent() || bufferSize <= 0) {
+      return readRecordsFromBlockPayload(type);
+    }
+
+    return deserializeRecords(getInputStreamSupplier().get(), 
getBlockContentLocation().get(), type, bufferSize);
+  }
+
   protected <T> ClosableIterator<T> 
readRecordsFromBlockPayload(HoodieReaderContext<T> readerContext) throws 
IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
@@ -270,6 +329,22 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
 
   protected abstract <T> ClosableIterator<HoodieRecord<T>> 
deserializeRecords(byte[] content, HoodieRecordType type) throws IOException;
 
+  /**
+   * Streaming deserialization of records.
+   *
+   * @param inputStream The input stream from which to read the records.
+   * @param contentLocation The location within the input stream where the 
content starts.
+   * @param bufferSize The size of the buffer to use for reading the records.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws IOException If there is an error reading or deserializing the 
records.
+   */
+  protected abstract <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+      SeekableDataInputStream inputStream,
+      HoodieLogBlockContentLocation contentLocation,
+      HoodieRecordType type,
+      int bufferSize
+  ) throws IOException;
+
   /**
    * Deserializes the content bytes of the data block to the records in 
engine-specific representation.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 4d1267701ba..ea474142463 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -125,6 +125,24 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     }
   }
 
+  /**
+   * Streaming deserialization of records.
+   *
+   * @param inputStream The input stream from which to read the records.
+   * @param contentLocation The location within the input stream where the 
content starts.
+   * @param bufferSize The size of the buffer to use for reading the records.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws IOException If there is an error reading or deserializing the 
records.
+   */
+  protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+      SeekableDataInputStream inputStream,
+      HoodieLogBlockContentLocation contentLocation,
+      HoodieRecordType type,
+      int bufferSize
+  ) throws IOException {
+    throw new UnsupportedOperationException("Streaming deserialization is not 
supported for HoodieHFileDataBlock");
+  }
+
   @Override
   protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T> 
readerContext, byte[] content) throws IOException {
     checkState(readerSchema != null, "Reader's schema has to be non-null");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 04c7827b4e4..78c39ff5faf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -345,6 +345,10 @@ public abstract class HoodieLogBlock {
     return Option.of(content);
   }
 
+  protected Supplier<SeekableDataInputStream> getInputStreamSupplier() {
+    return inputStreamSupplier;
+  }
+
   /**
    * Adds the record positions if the base file instant time of the positions 
exists
    * in the log header and the record positions are all valid.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 8d9b1221c54..35c0a0f8536 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -159,6 +159,24 @@ public class HoodieParquetDataBlock extends 
HoodieDataBlock {
     throw new UnsupportedOperationException("Should not be invoked");
   }
 
+  /**
+   * Streaming deserialization of records.
+   *
+   * @param inputStream The input stream from which to read the records.
+   * @param contentLocation The location within the input stream where the 
content starts.
+   * @param bufferSize The size of the buffer to use for reading the records.
+   * @return A ClosableIterator over HoodieRecord<T>.
+   * @throws IOException If there is an error reading or deserializing the 
records.
+   */
+  protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
+          SeekableDataInputStream inputStream,
+          HoodieLogBlockContentLocation contentLocation,
+          HoodieRecordType type,
+          int bufferSize
+  ) throws IOException {
+    throw new UnsupportedOperationException("Should not be invoked");
+  }
+
   @Override
   protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T> 
readerContext, byte[] content) throws IOException {
     throw new UnsupportedOperationException("Should not be invoked");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
new file mode 100644
index 00000000000..98d918c1cb5
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieAvroDataBlock.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestHoodieAvroDataBlock {
+  // Record key field of the test data
+  private static final String RECORD_KEY_FIELD = 
HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
+  private static final String SCHEMA_STRING = "{\n"
+          + "  \"type\": \"record\",\n"
+          + "  \"name\": \"RandomRecord\",\n"
+          + "  \"fields\": [\n"
+          + "    {\"name\": \"" + RECORD_KEY_FIELD + "\", \"type\": 
\"string\"},\n"
+          + "    {\"name\": \"value\", \"type\": \"double\"}\n"
+          + "  ]\n"
+          + "}";
+
+  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
+  // Just to avoid errors when the HoodieAvroDataBlock constructor wraps 
HoodieLogBlockContentLocation as an Option and encounters a null value
+  private static final HoodieLogBlockContentLocation 
NULL_BLOCK_CONTENT_LOCATION = new HoodieLogBlockContentLocation(null, null, 0, 
0, 0);
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock when records 
are present.
+   *
+   * @param useKeyFilter          Whether to use a key filter.
+   * @param keyFilterFullKeyMatch Whether the key filter should match the full 
key.
+   * @throws IOException If an I/O error occurs during the test.
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "true, true",
+      "true, false",
+      "false, true",
+      "false, false"
+  })
+  public void testGetRecordIteratorWithRecordsPresent(boolean useKeyFilter, 
boolean keyFilterFullKeyMatch) throws IOException {
+    List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+    HoodieAvroDataBlock block = createHoodieAvroDataBlock(SCHEMA, records);
+
+    // Expect records for using key filter
+    List<HoodieRecord> recordsForFilter = selectRandomRecords(records, 
keyFilterFullKeyMatch);
+    List<String> keysForFilter = 
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+    // Get record iterator
+    try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+        ? block.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, 
HoodieRecord.HoodieRecordType.AVRO)
+        : block.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+      List<HoodieRecord> retrievedRecords = new ArrayList<>();
+      recordIterator.forEachRemaining(retrievedRecords::add);
+
+      verifyRecords(useKeyFilter ? recordsForFilter : records, 
retrievedRecords);
+    }
+  }
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock using content 
directly.
+   *
+   * @param useKeyFilter          Whether to use a key filter.
+   * @param keyFilterFullKeyMatch Whether the key filter should match the full 
key.
+   * @throws IOException If an I/O error occurs during the test.
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "true, true",
+      "true, false",
+      "false, true",
+      "false, false"
+  })
+  public void testGetRecordIteratorWithContent(boolean useKeyFilter, boolean 
keyFilterFullKeyMatch) throws IOException {
+    List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+    byte[] content = createHoodieAvroDataBlockContent(SCHEMA, records);
+    Option<byte[]> contentOpt = Option.of(content);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+    // Expect records for using key filter
+    List<HoodieRecord> recordsForFilter = selectRandomRecords(records, 
keyFilterFullKeyMatch);
+    List<String> keysForFilter = 
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+    // With log block content
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> null, 
contentOpt, false,
+        NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new 
HashMap<>(), RECORD_KEY_FIELD);
+
+    // Get record iterator
+    try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+        ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, 
HoodieRecord.HoodieRecordType.AVRO)
+        : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+      List<HoodieRecord> retrievedRecords = new ArrayList<>();
+      recordIterator.forEachRemaining(retrievedRecords::add);
+
+      verifyRecords(useKeyFilter ? recordsForFilter : records, 
retrievedRecords);
+    }
+  }
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock with input 
stream (read block lazily).
+   *
+   * @param useStreamingRead    Whether to use streaming read.
+   * @param useKeyFilter        Whether to use a key filter.
+   * @param keyFilterFullKeyMatch Whether the key filter should match the full 
key.
+   * @throws IOException If an I/O error occurs during the test.
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "true, true, true",
+      "true, true, false",
+      "true, false, true",
+      "true, false, false",
+      "false, true, true",
+      "false, true, false",
+      "false, false, true",
+      "false, false, false"
+  })
+  public void testGetRecordIteratorWithInputStream(boolean useStreamingRead, 
boolean useKeyFilter, boolean keyFilterFullKeyMatch) throws IOException {
+    List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+    byte[] blockContent = createHoodieAvroDataBlockContent(SCHEMA, records);
+    SeekableDataInputStream inputStream = 
createSeekableDataInputStream(blockContent);
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+    HoodieLogBlockContentLocation logBlockContentLocation = new 
HoodieLogBlockContentLocation(null, null, 0, blockContent.length, 
blockContent.length);
+    int bufferSize = useStreamingRead ? 100 : 0; // bytes
+
+    // Expect records for using key filter
+    List<HoodieRecord> recordsForFilter = selectRandomRecords(records, 
keyFilterFullKeyMatch);
+    List<String> keysForFilter = 
recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
+
+    // With input stream
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream, 
Option.empty(), true,
+        logBlockContentLocation, Option.of(SCHEMA), header, new HashMap<>(), 
RECORD_KEY_FIELD);
+
+    // Get record iterator
+    try (ClosableIterator<HoodieRecord<Object>> recordIterator = useKeyFilter
+        ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, 
HoodieRecord.HoodieRecordType.AVRO, bufferSize)
+        : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, 
bufferSize)) {
+      List<HoodieRecord> retrievedRecords = new ArrayList<>();
+      recordIterator.forEachRemaining(retrievedRecords::add);
+
+      verifyRecords(useKeyFilter ? recordsForFilter : records, 
retrievedRecords);
+    }
+  }
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock with streaming 
read.
+   *
+   * @param bufferSize The size of the buffer to be used for the streaming 
read.
+   * @throws IOException If an I/O error occurs during the test.
+   */
+  @ParameterizedTest
+  @ValueSource(ints = {Integer.MIN_VALUE, -2, -1, 0, 1, 2, 100, 200, 300, 
1024, Integer.MAX_VALUE})
+  public void testGetRecordIteratorWithStreamingRead(int bufferSize) throws 
IOException {
+    List<HoodieRecord> records = generateRandomHoodieRecords(SCHEMA, 1000);
+    byte[] blockContent = createHoodieAvroDataBlockContent(SCHEMA, records);
+    SeekableDataInputStream inputStream = 
createSeekableDataInputStream(blockContent);
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+    HoodieLogBlockContentLocation logBlockContentLocation = new 
HoodieLogBlockContentLocation(null, null, 0, blockContent.length, 
blockContent.length);
+
+    // Create an instance of HoodieAvroDataBlock with streaming read
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream, 
Option.empty(), true,
+        logBlockContentLocation, Option.of(SCHEMA), header, new HashMap<>(), 
RECORD_KEY_FIELD);
+
+    // Get record iterator
+    try (ClosableIterator<HoodieRecord<Object>> recordIterator = 
dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, bufferSize)) {
+      List<HoodieRecord> retrievedRecords = new ArrayList<>();
+      recordIterator.forEachRemaining(retrievedRecords::add);
+
+      verifyRecords(records, retrievedRecords);
+    }
+  }
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock with empty 
content.
+   *
+   */
+  @Test
+  public void testGetRecordIteratorWithEmptyContent() {
+    byte[] content = new byte[0]; // Simulate empty content bytes
+    Option<byte[]> contentOption = Option.of(content);
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+    // With content
+    HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null, 
contentOption, false,
+        NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new 
HashMap<>(), RECORD_KEY_FIELD);
+
+    // Mock the behavior of deserializeRecords to throw IOException
+    assertThrows(EOFException.class, () -> 
avroDataBlock.deserializeRecords(content, HoodieRecord.HoodieRecordType.AVRO));
+
+    // Call getRecordIterator and verify the behavior
+    assertThrows(HoodieIOException.class, () -> 
avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO));
+  }
+
+  /**
+   * Tests the getRecordIterator method of HoodieAvroDataBlock with empty 
content and input stream.
+   *
+   */
+  @ParameterizedTest
+  @ValueSource(ints = {Integer.MIN_VALUE, -2, -1, 0, 1, 2, 100, 200, 300, 
1024, Integer.MAX_VALUE})
+  public void testGetRecordIteratorWithEmptyContentAndInputStream(int 
bufferSize) throws IOException {
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, SCHEMA.toString());
+
+    // empty input stream supplier and empty content
+    HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null, 
Option.empty(), true,
+        NULL_BLOCK_CONTENT_LOCATION, Option.of(SCHEMA), header, new 
HashMap<>(), RECORD_KEY_FIELD);
+
+    assertThrows(NullPointerException.class, () -> 
avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, 
bufferSize));
+  }
+
+  /**
+   * Generates a list of random HoodieRecord objects based on the provided 
Avro schema.
+   *
+   * @param schema      The Avro schema to use for generating the records.
+   * @param recordCount The number of records to generate.
+   * @return A list of HoodieRecord objects.
+   */
+  private static List<HoodieRecord> generateRandomHoodieRecords(Schema schema, 
int recordCount) {
+    List<HoodieRecord> records = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < recordCount; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      String recordKey = "key_" + i;
+      record.put("_hoodie_record_key", "key_" + i);
+      record.put("value", random.nextDouble());
+      records.add(new HoodieAvroIndexedRecord(new HoodieKey(recordKey, ""), 
record));
+    }
+
+    return records;
+  }
+
+  /**
+   * Creates a HoodieAvroDataBlock object from the provided schema and list of 
HoodieRecord objects.
+   *
+   * @param schema  The Avro schema used to define the structure of the 
records.
+   * @param records The list of HoodieRecord objects to be included in the 
data block.
+   * @return A HoodieAvroDataBlock object containing the provided records and 
schema.
+   */
+  private static HoodieAvroDataBlock createHoodieAvroDataBlock(Schema schema, 
List<HoodieRecord> records) {
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, schema.toString());
+
+    return new HoodieAvroDataBlock(records, header, RECORD_KEY_FIELD);
+  }
+
+  /**
+   * Creates the content bytes of a HoodieAvroDataBlock from the provided 
schema and list of HoodieRecord objects.
+   *
+   * @param schema  The Avro schema used to define the structure of the 
records.
+   * @param records The list of HoodieRecord objects to be included in the 
data block.
+   * @return A byte array representing the content of the HoodieAvroDataBlock.
+   * @throws IOException If an I/O error occurs while generating the content 
bytes.
+   */
+  private static byte[] createHoodieAvroDataBlockContent(Schema schema, 
List<HoodieRecord> records) throws IOException {
+    Map<HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HeaderMetadataType.SCHEMA, schema.toString());
+
+    return new HoodieAvroDataBlock(records, header, 
RECORD_KEY_FIELD).getContentBytes(null);
+  }
+
+  /**
+   * Creates a SeekableDataInputStream from the provided byte array content.
+   *
+   * @param content The byte array containing the data to be read.
+   * @return A SeekableDataInputStream that allows seeking within the provided 
byte array content.
+   * @throws IOException If an I/O error occurs while creating the 
SeekableDataInputStream.
+   */
+  private static SeekableDataInputStream createSeekableDataInputStream(byte[] 
content) throws IOException {
+    return new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(content));
+  }
+
+  /**
+   * Selects a random subset of HoodieRecord objects from the provided list.
+   *
+   * @param records The list of HoodieRecord objects to sample from.
+   * @param fullKey A boolean flag indicating whether to match records by full 
key or by prefix.
+   *                If true, records are matched by full key; if false, 
records are matched by key prefix.
+   * @return A list of randomly selected HoodieRecord objects.
+   */
+  private static List<HoodieRecord> selectRandomRecords(List<HoodieRecord> 
records, boolean fullKey) {
+    // number of sampled records
+    int count = new Random().nextInt(records.size());
+    // set of record keys of the sampled records
+    Set<String> keys = new Random()
+        .ints(count, 0, records.size())
+        .mapToObj(records::get)
+        .map(r -> r.getRecordKey(SCHEMA, RECORD_KEY_FIELD))
+        .collect(Collectors.toSet());
+
+    // simulate KeyFilter matching logic
+    return records.stream()
+        .filter(r -> fullKey
+            ? keys.contains(r.getRecordKey(SCHEMA, RECORD_KEY_FIELD))
+            : keys.stream().anyMatch(r.getRecordKey(SCHEMA, 
RECORD_KEY_FIELD)::startsWith))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Verifies that the actual records match the expected records.
+   * This method converts the HoodieRecord objects to GenericRecord objects, 
sorts them,
+   * and then compares the sorted lists for equality.
+   *
+   * @param expectedRecords The list of expected HoodieRecord objects.
+   * @param actualRecords The list of actual HoodieRecord objects to verify.
+   * @throws AssertionError If the record counts do not match or if any record 
data mismatches.
+   */
+  private static void verifyRecords(List<HoodieRecord> expectedRecords, 
List<HoodieRecord> actualRecords) {
+    List<GenericRecord> expectedGenericRecords = 
convertAndSortRecords(expectedRecords);
+    List<GenericRecord> actualGenericRecords = 
convertAndSortRecords(actualRecords);
+
+    assertEquals(expectedGenericRecords.size(), actualGenericRecords.size(), 
"Record count mismatch");
+    for (int i = 0; i < expectedGenericRecords.size(); i++) {
+      assertEquals(expectedGenericRecords, actualGenericRecords, "Record data 
mismatch");
+    }
+  }
+
+  private static List<GenericRecord> convertAndSortRecords(List<HoodieRecord> 
hoodieRecords) {
+    return hoodieRecords.stream()
+            .map(HoodieRecord::getData)
+            .map(r -> (GenericRecord) r)
+            .sorted(Comparator.comparing(r -> 
r.get(RECORD_KEY_FIELD).toString()))
+            .collect(Collectors.toList());
+  }
+}


Reply via email to