This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ba19813df3 [HUDI-9626] Prefetch HFiles for metadata table if size is 
below a configured threshold (#13567)
81ba19813df3 is described below

commit 81ba19813df3ccea1e8b44f90dc1d4b31c0d7d2a
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Jul 29 04:23:47 2025 -0700

    [HUDI-9626] Prefetch HFiles for metadata table if size is below a 
configured threshold (#13567)
    
    * add a new write config;
    * Add HFileReaderFactory;
    * Add test for the factory class.
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/common/config/HoodieMetadataConfig.java   |  12 ++
 .../apache/hudi/io/storage/HFileReaderFactory.java | 123 ++++++++++++
 .../io/storage/HoodieNativeAvroHFileReader.java    |  56 ++----
 .../hudi/io/storage/TestHFileReaderFactory.java    | 215 +++++++++++++++++++++
 .../storage/TestHoodieNativeAvroHFileReader.java   |   9 +-
 .../io/hadoop/HoodieAvroFileReaderFactory.java     |  11 +-
 .../io/hadoop/TestHoodieHFileReaderWriter.java     |  13 +-
 7 files changed, 391 insertions(+), 48 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 66f89e125097..19d18157eee3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -496,6 +496,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Default number of partitions to use when 
repartitioning is needed. "
           + "This provides a reasonable level of parallelism for metadata 
table operations.");
 
+  public static final ConfigProperty<Integer> METADATA_FILE_CACHE_MAX_SIZE_MB 
= ConfigProperty
+      .key(METADATA_PREFIX + ".file.cache.max.size.mb")
+      .defaultValue(0)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Max size in MB below which metadata file (HFile) 
will be downloaded "
+          + "and cached entirely for the HFileReader.");
+
   public long getMaxLogFileSize() {
     return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
   }
@@ -729,6 +737,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(REPARTITION_DEFAULT_PARTITIONS);
   }
 
+  public int getFileCacheMaxSizeMB() {
+    return getInt(METADATA_FILE_CACHE_MAX_SIZE_MB);
+  }
+
   /**
    * Checks if a specific metadata index is marked for dropping based on the 
metadata configuration.
    * NOTE: Only applicable for secondary indexes (SI) or expression indexes 
(EI).
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
new file mode 100644
index 000000000000..665c38c14c62
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Either;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import java.io.IOException;
+
+/**
+ * Factory class to provide the implementation for
+ * the HFile Reader for {@link HoodieNativeAvroHFileReader}.
+ */
+public class HFileReaderFactory {
+
+  private final HoodieStorage storage;
+  private final HoodieMetadataConfig metadataConfig;
+  private final Either<StoragePath, byte[]> fileSource;
+
+  public HFileReaderFactory(HoodieStorage storage,
+                            TypedProperties properties,
+                            Either<StoragePath, byte[]> fileSource) {
+    this.storage = storage;
+    this.metadataConfig = 
HoodieMetadataConfig.newBuilder().withProperties(properties).build();
+    this.fileSource = fileSource;
+  }
+
+  public HFileReader createHFileReader() throws IOException {
+    final long fileSize = determineFileSize();
+    final SeekableDataInputStream inputStream = createInputStream(fileSize);
+    return new HFileReaderImpl(inputStream, fileSize);
+  }
+
+  private long determineFileSize() throws IOException {
+    if (fileSource.isLeft()) {
+      return storage.getPathInfo(fileSource.asLeft()).getLength();
+    }
+    return fileSource.asRight().length;
+  }
+
+  private SeekableDataInputStream createInputStream(long fileSize) throws 
IOException {
+    if (fileSource.isLeft()) {
+      if (fileSize <= (long) metadataConfig.getFileCacheMaxSizeMB() * 1024L * 
1024L) {
+        // Download the whole file if the file size is below a configured 
threshold
+        StoragePath path = fileSource.asLeft();
+        byte[] buffer;
+        try (SeekableDataInputStream stream = storage.openSeekable(path, 
false)) {
+          buffer = new byte[(int) storage.getPathInfo(path).getLength()];
+          stream.readFully(buffer);
+        }
+        return new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(buffer));
+      }
+      return storage.openSeekable(fileSource.asLeft(), false);
+    }
+    return new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(fileSource.asRight()));
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private HoodieStorage storage;
+    private Option<TypedProperties> properties = Option.empty();
+    private Either<StoragePath, byte[]> fileSource;
+
+    public Builder withStorage(HoodieStorage storage) {
+      this.storage = storage;
+      return this;
+    }
+
+    public Builder withProps(TypedProperties props) {
+      this.properties = Option.of(props);
+      return this;
+    }
+
+    public Builder withPath(StoragePath path) {
+      ValidationUtils.checkState(fileSource == null, "HFile source already 
set, cannot set path");
+      this.fileSource = Either.left(path);
+      return this;
+    }
+
+    public Builder withContent(byte[] bytesContent) {
+      ValidationUtils.checkState(fileSource == null, "HFile source already 
set, cannot set bytes content");
+      this.fileSource = Either.right(bytesContent);
+      return this;
+    }
+
+    public HFileReaderFactory build() {
+      ValidationUtils.checkArgument(storage != null, "Storage cannot be null");
+      ValidationUtils.checkArgument(fileSource != null, "HFile source cannot 
be null");
+      TypedProperties props = properties.isPresent() ? properties.get() : new 
TypedProperties();
+      return new HFileReaderFactory(storage, props, fileSource);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 94c8316fb030..c1bf11ee85e5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -29,19 +29,14 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.expression.Expression;
 import org.apache.hudi.expression.Predicate;
 import org.apache.hudi.expression.Predicates;
-import org.apache.hudi.io.ByteArraySeekableDataInputStream;
-import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.hfile.HFileReader;
-import org.apache.hudi.io.hfile.HFileReaderImpl;
 import org.apache.hudi.io.hfile.KeyValue;
 import org.apache.hudi.io.hfile.UTF8StringKey;
-import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
 
@@ -79,27 +74,17 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   private static final Set<String> PRELOADED_META_INFO_KEYS = new HashSet<>(
       Arrays.asList(KEY_MIN_RECORD, KEY_MAX_RECORD, SCHEMA_KEY));
 
-  private final HoodieStorage storage;
-  private final Option<StoragePath> path;
-  private final Option<byte[]> bytesContent;
+  private final HFileReaderFactory readerFactory;
+  private final StoragePath path;
   // In-memory cache for meta info
   private final Map<String, byte[]> metaInfoMap;
   private final Lazy<Schema> schema;
   private boolean isMetaInfoLoaded = false;
   private long numKeyValueEntries = -1L;
 
-  public HoodieNativeAvroHFileReader(HoodieStorage storage, StoragePath path, 
Option<Schema> schemaOption) {
-    this.storage = storage;
-    this.path = Option.of(path);
-    this.bytesContent = Option.empty();
-    this.metaInfoMap = new HashMap<>();
-    this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() -> 
Lazy.lazily(this::fetchSchema));
-  }
-
-  public HoodieNativeAvroHFileReader(HoodieStorage storage, byte[] content, 
Option<Schema> schemaOption) {
-    this.storage = storage;
-    this.path = Option.empty();
-    this.bytesContent = Option.of(content);
+  public HoodieNativeAvroHFileReader(HFileReaderFactory readerFactory, 
StoragePath path, Option<Schema> schemaOption) {
+    this.readerFactory = readerFactory;
+    this.path = path;
     this.metaInfoMap = new HashMap<>();
     this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() -> 
Lazy.lazily(this::fetchSchema));
   }
@@ -113,7 +98,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
           "Schema projections are not supported in HFile reader");
     }
 
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     return new RecordIterator(reader, getSchema(), readerSchema);
   }
 
@@ -130,7 +115,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
 
   @Override
   public BloomFilter readBloomFilter() {
-    try (HFileReader reader = newHFileReader()) {
+    try (HFileReader reader = readerFactory.createHFileReader()) {
       ByteBuffer byteBuffer = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).get();
       return BloomFilterFactory.fromByteBuffer(byteBuffer,
           fromUTF8Bytes(reader.getMetaInfo(new 
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).get()));
@@ -141,7 +126,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
 
   @Override
   public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
-    try (HFileReader reader = newHFileReader()) {
+    try (HFileReader reader = readerFactory.createHFileReader()) {
       reader.seekTo();
       // candidateRowKeys must be sorted
       return new TreeSet<>(candidateRowKeys).stream()
@@ -163,7 +148,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
 
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     return new ClosableIterator<String>() {
       @Override
       public boolean hasNext() {
@@ -220,7 +205,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeysIterator(
       List<String> sortedKeys, Schema schema) throws IOException {
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     ClosableIterator<IndexedRecord> iterator =
         new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema);
     return new CloseableMappingIterator<>(
@@ -230,7 +215,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeyPrefixIterator(
       List<String> sortedKeyPrefixes, Schema schema) throws IOException {
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     ClosableIterator<IndexedRecord> iterator =
         new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(), 
schema);
     return new CloseableMappingIterator<>(
@@ -310,7 +295,7 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   private synchronized void loadAllMetaInfoIntoCacheIfNeeded() throws 
IOException {
     if (!isMetaInfoLoaded) {
       // Load all meta info that are small into cache
-      try (HFileReader reader = newHFileReader()) {
+      try (HFileReader reader = readerFactory.createHFileReader()) {
         this.numKeyValueEntries = reader.getNumKeyValueEntries();
         for (String metaInfoKey : PRELOADED_META_INFO_KEYS) {
           Option<byte[]> metaInfo = reader.getMetaInfo(new 
UTF8StringKey(metaInfoKey));
@@ -325,29 +310,16 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
     }
   }
 
-  private HFileReader newHFileReader() throws IOException {
-    SeekableDataInputStream inputStream;
-    long fileSize;
-    if (path.isPresent()) {
-      fileSize = storage.getPathInfo(path.get()).getLength();
-      inputStream = storage.openSeekable(path.get(), false);
-    } else {
-      fileSize = bytesContent.get().length;
-      inputStream = new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(bytesContent.get()));
-    }
-    return new HFileReaderImpl(inputStream, fileSize);
-  }
-
   public ClosableIterator<IndexedRecord> 
getIndexedRecordsByKeysIterator(List<String> sortedKeys,
                                                                          
Schema readerSchema) throws IOException {
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     return new RecordByKeyIterator(reader, sortedKeys, getSchema(), 
readerSchema);
   }
 
   @Override
   public ClosableIterator<IndexedRecord> 
getIndexedRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes,
                                                                               
Schema readerSchema) throws IOException {
-    HFileReader reader = newHFileReader();
+    HFileReader reader = readerFactory.createHFileReader();
     return new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, 
getSchema(), readerSchema);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
new file mode 100644
index 000000000000..a0a98575d4cd
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHFileReaderFactory.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestHFileReaderFactory {
+
+  @Mock
+  private HoodieStorage mockStorage;
+
+  @Mock
+  private StoragePath mockPath;
+
+  @Mock
+  private StoragePathInfo mockPathInfo;
+
+  @Mock
+  private SeekableDataInputStream mockInputStream;
+
+  private TypedProperties properties;
+  private final byte[] testContent = "test content".getBytes();
+
+  @BeforeEach
+  void setUp() {
+    properties = new TypedProperties();
+  }
+
+  @Test
+  void testCreateHFileReader_FileSizeBelowThreshold_ShouldUseContentCache() 
throws IOException {
+    final long fileSizeBelow = 5L; // 5 bytes - below default threshold
+    final int thresholdMB = 10; // 10MB threshold
+
+    
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
 String.valueOf(thresholdMB));
+
+    when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+    when(mockPathInfo.getLength()).thenReturn(fileSizeBelow);
+    when(mockStorage.openSeekable(mockPath, 
false)).thenReturn(mockInputStream);
+    doAnswer(invocation -> {
+      byte[] buffer = invocation.getArgument(0);
+      System.arraycopy(testContent, 0, buffer, 0, Math.min(testContent.length, 
buffer.length));
+      return null;
+    }).when(mockInputStream).readFully(any(byte[].class));
+
+    HFileReaderFactory factory = HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withProps(properties)
+        .withPath(mockPath)
+        .build();
+    HFileReader result = factory.createHFileReader();
+
+    assertNotNull(result);
+    assertInstanceOf(HFileReaderImpl.class, result);
+
+    // Verify that content was downloaded (cache was used)
+    verify(mockStorage, times(2)).getPathInfo(mockPath); // Once for size 
determination, once for download
+    verify(mockStorage, times(1)).openSeekable(mockPath, false); // For 
content download
+    verify(mockInputStream, times(1)).readFully(any(byte[].class));
+  }
+
+  @Test
+  void testCreateHFileReader_FileSizeAboveThreshold_ShouldNotUseContentCache() 
throws IOException {
+    final long fileSizeAbove = 15L * 1024L * 1024L; // 15MB - above 10MB 
threshold
+    final int thresholdMB = 10; // 10MB threshold
+
+    
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
 String.valueOf(thresholdMB));
+
+    when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+    when(mockPathInfo.getLength()).thenReturn(fileSizeAbove);
+    when(mockStorage.openSeekable(mockPath, 
false)).thenReturn(mockInputStream);
+
+    HFileReaderFactory factory = HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withProps(properties)
+        .withPath(mockPath)
+        .build();
+    HFileReader result = factory.createHFileReader();
+
+    assertNotNull(result);
+    assertInstanceOf(HFileReaderImpl.class, result);
+
+    // Verify that content was NOT downloaded (cache was not used)
+    verify(mockStorage, times(1)).getPathInfo(mockPath); // Only once for size 
determination
+    verify(mockStorage, times(1)).openSeekable(mockPath, false); // For 
creating input stream directly
+    verify(mockInputStream, never()).readFully(any(byte[].class)); // Content 
not downloaded
+  }
+
+  @Test
+  void 
testCreateHFileReader_ContentProvidedInConstructor_ShouldUseProvidedContent() 
throws IOException {
+    final int thresholdMB = 10; // 10MB threshold
+
+    
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
 String.valueOf(thresholdMB));
+
+    HFileReaderFactory factory = HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withProps(properties)
+        .withContent(testContent)
+        .build();
+    HFileReader result = factory.createHFileReader();
+
+    assertNotNull(result);
+    assertInstanceOf(HFileReaderImpl.class, result);
+
+    // Verify that storage was never accessed since content was provided
+    verify(mockStorage, never()).getPathInfo(any());
+    verify(mockStorage, never()).openSeekable(any(), anyBoolean());
+  }
+
+  @Test
+  void testCreateHFileReader_ContentProvidedAndPathProvided_ShouldFail() 
throws IOException {
+    final int thresholdMB = 10;
+
+    
properties.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
 String.valueOf(thresholdMB));
+
+    IllegalStateException exception = 
Assertions.assertThrows(IllegalStateException.class, () -> 
HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withProps(properties)
+        .withPath(mockPath)
+        .withContent(testContent)
+        .build());
+    assertEquals("HFile source already set, cannot set bytes content", 
exception.getMessage());
+
+    exception = Assertions.assertThrows(IllegalStateException.class, () -> 
HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withProps(properties)
+        .withContent(testContent)
+        .withPath(mockPath)
+        .build());
+    assertEquals("HFile source already set, cannot set path", 
exception.getMessage());
+  }
+
+  @Test
+  void testCreateHFileReader_NoPathOrContent_ShouldThrowException() {
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+      HFileReaderFactory.builder()
+          .withStorage(mockStorage)
+          .withProps(properties)
+          .build();
+    });
+    assertEquals("HFile source cannot be null", exception.getMessage());
+  }
+
+  @Test
+  void testBuilder_WithNullStorage_ShouldThrowException() {
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+      HFileReaderFactory.builder()
+          .withStorage(null)
+          .withPath(mockPath)
+          .build();
+    });
+    assertEquals("Storage cannot be null", exception.getMessage());
+  }
+
+  @Test
+  void testBuilder_WithoutPropertiesProvided_ShouldUseDefaultProperties() 
throws IOException {
+    when(mockStorage.getPathInfo(mockPath)).thenReturn(mockPathInfo);
+    when(mockPathInfo.getLength()).thenReturn(1024L);
+    when(mockStorage.openSeekable(mockPath, 
false)).thenReturn(mockInputStream);
+
+    // Not providing properties, should use defaults
+    HFileReaderFactory factory = HFileReaderFactory.builder()
+        .withStorage(mockStorage)
+        .withPath(mockPath)
+        .build();
+
+    HFileReader result = factory.createHFileReader();
+    assertNotNull(result);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
index 7b25dfa3f8cd..9d90ea298ad3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.expression.Expression;
 import org.apache.hudi.expression.Predicate;
@@ -38,13 +39,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 class TestHoodieNativeAvroHFileReader {
+
+  private static final TypedProperties DEFAULT_PROPS = new TypedProperties();
   private static HoodieNativeAvroHFileReader reader;
 
   TestHoodieNativeAvroHFileReader() {
     HoodieStorage storage = mock(HoodieStorage.class);
     StoragePath path = new StoragePath("anyPath");
-    reader = new HoodieNativeAvroHFileReader(
-        storage, path, Option.empty());
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage).withProps(DEFAULT_PROPS)
+        .withPath(path).build();
+    reader = new HoodieNativeAvroHFileReader(readerFactory, path, 
Option.empty());
   }
 
   @Test
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
index 21f43bc26aa2..3cf3ed8a4818 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -47,7 +48,10 @@ public class HoodieAvroFileReaderFactory extends 
HoodieFileReaderFactory {
   protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig,
                                                 StoragePath path,
                                                 Option<Schema> schemaOption) 
throws IOException {
-    return new HoodieNativeAvroHFileReader(storage, path, schemaOption);
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage).withProps(hoodieConfig.getProps())
+        .withPath(path).build();
+    return new HoodieNativeAvroHFileReader(readerFactory, path, schemaOption);
   }
 
   @Override
@@ -56,7 +60,10 @@ public class HoodieAvroFileReaderFactory extends 
HoodieFileReaderFactory {
                                                 HoodieStorage storage,
                                                 byte[] content,
                                                 Option<Schema> schemaOption) 
throws IOException {
-    return new HoodieNativeAvroHFileReader(this.storage, content, 
schemaOption);
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage).withProps(hoodieConfig.getProps())
+        .withContent(content).build();
+    return new HoodieNativeAvroHFileReader(readerFactory, path, schemaOption);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 8af1f6b49bfc..260d98a8b64b 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -87,6 +89,7 @@ import static org.mockito.Mockito.when;
 
 public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
   protected static final int NUM_RECORDS_FIXTURE = 50;
+  protected static final TypedProperties DEFAULT_PROPS = new TypedProperties();
 
   protected static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
     return Arrays.stream(new Boolean[][] {
@@ -100,12 +103,18 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
   @Override
   protected HoodieAvroFileReader createReader(
       HoodieStorage storage) throws Exception {
-    return new HoodieNativeAvroHFileReader(storage, getFilePath(), 
Option.empty());
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage).withProps(DEFAULT_PROPS)
+        .withPath(getFilePath()).build();
+    return new HoodieNativeAvroHFileReader(readerFactory, getFilePath(), 
Option.empty());
   }
 
   protected HoodieAvroHFileReaderImplBase createHFileReader(HoodieStorage 
storage,
                                                             byte[] content) 
throws IOException {
-    return new HoodieNativeAvroHFileReader(storage, content, Option.empty());
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage).withProps(DEFAULT_PROPS)
+        .withContent(content).build();
+    return new HoodieNativeAvroHFileReader(readerFactory, getFilePath(), 
Option.empty());
   }
 
   protected void verifyHFileReader(byte[] content,

Reply via email to