This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b0fa11424d37 fix: Fixing point lookup in MDT partitions (#14085)
b0fa11424d37 is described below

commit b0fa11424d379c7226999a4fb2a1a8922f8812b9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Oct 15 14:07:43 2025 -0700

    fix: Fixing point lookup in MDT partitions (#14085)
    
    
    ---------
    
    Co-authored-by: Lin Liu <[email protected]>
---
 .../HoodieLogCompactionPlanGenerator.java          |   1 -
 .../table/log/BaseHoodieLogRecordReader.java       |  18 ++-
 .../table/log/HoodieLogBlockMetadataScanner.java   |   5 +-
 .../table/log/HoodieMergedLogRecordReader.java     |  18 +--
 .../common/table/log/block/HoodieDataBlock.java    |  22 ++--
 .../table/log/block/HoodieHFileDataBlock.java      |  27 ++++-
 .../table/read/buffer/FileGroupRecordBuffer.java   |  20 ++--
 .../io/storage/HoodieNativeAvroHFileReader.java    |  20 +++-
 .../hudi/io/storage/HoodieSeekingFileReader.java   |   8 ++
 .../io/hadoop/TestHoodieHFileReaderWriter.java     | 133 +++++++++++++++++++++
 .../TestHoodieClientOnMergeOnReadStorage.java      |   2 -
 11 files changed, 226 insertions(+), 48 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 4739e9cfff51..3d704d85c7b5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -102,7 +102,6 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
     }
     HoodieLogBlockMetadataScanner scanner = new 
HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles()
         .sorted(HoodieLogFile.getLogFileComparator())
-        .map(file -> file.getPath().toString())
         .collect(Collectors.toList()),
         writeConfig.getMaxDFSStreamBufferSize(),
         maxInstantTime,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 4bb3454fbda9..b471f5089369 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -41,7 +41,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -98,7 +97,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   protected final String orderingFields;
   private final TypedProperties payloadProps;
   // Log File Paths
-  protected final List<String> logFilePaths;
+  protected final List<HoodieLogFile> logFiles;
   // Reverse reader - Not implemented yet (NA -> Why do we need ?)
   // but present here for plumbing for future implementation
   private final boolean reverseReader;
@@ -139,7 +138,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // table version for compatibility
   private final HoodieTableVersion tableVersion;
 
-  protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage, 
List<String> logFilePaths,
+  protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
+                                      List<HoodieLogFile> logFiles,
                                       boolean reverseReader, int bufferSize, 
Option<InstantRange> instantRange,
                                       boolean withOperationField, boolean 
forceFullScan, Option<String> partitionNameOverride,
                                       Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
@@ -158,8 +158,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
       props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
this.orderingFields);
     }
     this.payloadProps = props;
-    this.totalLogFiles.addAndGet(logFilePaths.size());
-    this.logFilePaths = logFilePaths;
+    this.totalLogFiles.addAndGet(logFiles.size());
+    this.logFiles = logFiles;
     this.reverseReader = reverseReader;
     this.storage = storage;
     this.bufferSize = bufferSize;
@@ -219,8 +219,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     HoodieLogFormatReader logFormatReaderWrapper = null;
     try {
       // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
StoragePath(logFile))).collect(Collectors.toList()),
+      logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
           readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -511,8 +510,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     HoodieLogFormatReader logFormatReaderWrapper = null;
     try {
       // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
StoragePath(logFile))).collect(Collectors.toList()),
+      logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
           readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       /**
@@ -745,7 +743,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
       }
     }
     // At this step the lastBlocks are consumed. We track approximate progress 
by number of log-files seen
-    progress = (float) (numLogFilesSeen - 1) / logFilePaths.size();
+    progress = (float) (numLogFilesSeen - 1) / logFiles.size();
   }
 
   private boolean shouldLookupRecords() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
index 14d65d1aa6a3..23d64eed88cb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 
@@ -32,8 +33,8 @@ import java.util.List;
  */
 public class HoodieLogBlockMetadataScanner extends 
BaseHoodieLogRecordReader<IndexedRecord> {
 
-  public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, 
List<String> logFilePaths, int bufferSize, String maxInstantTime, 
Option<InstantRange> instantRange) {
-    super(getReaderContext(metaClient, maxInstantTime), metaClient, 
metaClient.getStorage(), logFilePaths, false, bufferSize, instantRange, false, 
false, Option.empty(), Option.empty(), true,
+  public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, 
List<HoodieLogFile> logFiles, int bufferSize, String maxInstantTime, 
Option<InstantRange> instantRange) {
+    super(getReaderContext(metaClient, maxInstantTime), metaClient, 
metaClient.getStorage(), logFiles, false, bufferSize, instantRange, false, 
false, Option.empty(), Option.empty(), true,
         null, false);
     scanInternal(Option.empty(), true);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index c861f6fc2463..4ca30827fb8b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -63,11 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private long totalTimeTakenToReadAndMergeBlocks;
 
   @SuppressWarnings("unchecked")
-  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String> 
logFilePaths, boolean reverseReader,
+  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient metaClient, HoodieStorage storage,
+                                      List<HoodieLogFile> logFiles, boolean 
reverseReader,
                                       int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField, boolean forceFullScan,
                                       Option<String> partitionName, 
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
                                       HoodieFileGroupRecordBuffer<T> 
recordBuffer, boolean allowInflightInstants) {
-    super(readerContext, metaClient, storage, logFilePaths, reverseReader, 
bufferSize, instantRange, withOperationField,
+    super(readerContext, metaClient, storage, logFiles, reverseReader, 
bufferSize, instantRange, withOperationField,
         forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
 
     if (forceFullScan) {
@@ -101,7 +102,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
     this.numMergedRecordsInLog = recordBuffer.size();
 
-    LOG.info("Number of log files scanned => {}", logFilePaths.size());
+    LOG.info("Number of log files scanned => {}", logFiles.size());
     LOG.info("Number of entries in Map => {}", recordBuffer.size());
   }
 
@@ -159,7 +160,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   public static class Builder<T> extends BaseHoodieLogRecordReader.Builder<T> {
     private HoodieReaderContext<T> readerContext;
     private HoodieStorage storage;
-    private List<String> logFilePaths;
+    private List<HoodieLogFile> logFiles;
     private boolean reverseReader;
     private int bufferSize;
     // specific configurations
@@ -192,9 +193,8 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
 
     @Override
     public Builder<T> withLogFiles(List<HoodieLogFile> hoodieLogFiles) {
-      this.logFilePaths = hoodieLogFiles.stream()
+      this.logFiles = hoodieLogFiles.stream()
           .filter(l -> !l.isCDC())
-          .map(l -> l.getPath().toString())
           .collect(Collectors.toList());
       return this;
     }
@@ -263,13 +263,13 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     public HoodieMergedLogRecordReader<T> build() {
       ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is 
null in Merged Log Record Reader");
       ValidationUtils.checkArgument(readerContext != null, "Reader Context is 
null in Merged Log Record Reader");
-      if (this.partitionName == null && 
CollectionUtils.nonEmpty(this.logFilePaths)) {
+      if (this.partitionName == null && 
CollectionUtils.nonEmpty(this.logFiles)) {
         this.partitionName = getRelativePartitionPath(
-            new StoragePath(readerContext.getTablePath()), new 
StoragePath(this.logFilePaths.get(0)).getParent());
+            new StoragePath(readerContext.getTablePath()), 
logFiles.get(0).getPath().getParent());
       }
 
       return new HoodieMergedLogRecordReader<>(
-          readerContext, metaClient, storage, logFilePaths,
+          readerContext, metaClient, storage, logFiles,
           reverseReader, bufferSize, instantRange,
           withOperationField, forceFullScan,
           Option.ofNullable(partitionName),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 084909860282..625c678da6bc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -260,19 +260,13 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
    * @param <T>           The type of engine-specific record representation to 
return.
    * @return An iterator containing the records of interest in specified type.
    */
-  public final <T> ClosableIterator<T> 
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String> 
keys, boolean fullKey) {
+  public final <T> ClosableIterator<T> 
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String> 
keys, boolean fullKey) throws IOException {
     boolean fullScan = keys.isEmpty();
-
-    // Otherwise, we fetch all the records and filter out all the records, but 
the
-    // ones requested
-    ClosableIterator<T> allRecords = getEngineRecordIterator(readerContext);
-    if (fullScan) {
-      return allRecords;
+    if (!fullScan) {
+      return lookupEngineRecords(keys, fullKey);
+    } else {
+      throw new IllegalStateException("Unexpected code reached. Expected to be 
called only with keySpec defined for non FILES partition in Metadata table");
     }
-
-    HashSet<String> keySet = new HashSet<>(keys);
-    return FilteringEngineRecordIterator.getInstance(allRecords, keySet, 
fullKey, record ->
-        Option.of(readerContext.getRecordContext().getRecordKey(record, 
readerSchema)));
   }
 
   protected <T> ClosableIterator<HoodieRecord<T>> 
readRecordsFromBlockPayload(HoodieRecordType type) throws IOException {
@@ -329,6 +323,12 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     );
   }
 
+  protected <T> ClosableIterator<T> lookupEngineRecords(List<String> keys, 
boolean fullKey) throws IOException {
+    throw new UnsupportedOperationException(
+        String.format("Point lookups are not supported by this Data block type 
(%s)", getBlockType())
+    );
+  }
+
   protected abstract ByteArrayOutputStream serializeRecords(List<HoodieRecord> 
records, HoodieStorage storage) throws IOException;
 
   protected abstract <T> ClosableIterator<HoodieRecord<T>> 
deserializeRecords(byte[] content, HoodieRecordType type) throws IOException;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 0ee914c32dc5..494cd02d781e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -173,8 +173,33 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
       // Get writer's schema from the header
       final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
           fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) 
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
-
       return new CloseableMappingIterator<>(recordIterator, data -> 
(HoodieRecord<T>) data);
     }
   }
+
+  @Override
+  protected <T> ClosableIterator<T> lookupEngineRecords(List<String> 
sortedKeys, boolean fullKey) throws IOException {
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
+    // NOTE: It's important to extend Hadoop configuration here to make sure 
configuration
+    //       is appropriately carried over
+    StorageConfiguration<?> inlineConf = 
getBlockContentLocation().get().getStorage().getConf().getInline();
+    StoragePath inlinePath = InLineFSUtils.getInlineFilePath(
+        blockContentLoc.getLogFile().getPath(),
+        blockContentLoc.getLogFile().getPath().toUri().getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+    HoodieStorage inlineStorage = 
blockContentLoc.getStorage().newInstance(inlinePath, inlineConf);
+
+    try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
+        .getIOFactory(inlineStorage)
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+            inlinePath,
+            HoodieFileFormat.HFILE,
+            Option.of(getSchemaFromHeader()))) {
+      // Get writer's schema from the header
+      return (ClosableIterator<T>) (fullKey ? 
reader.getEngineRecordsByKeysIterator(sortedKeys, readerSchema) : 
reader.getEngineRecordsByKeyPrefixIterator(sortedKeys, readerSchema));
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index d8d9d215ae07..5b3ff3bcae0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -193,15 +193,19 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
    */
   protected Pair<ClosableIterator<T>, Schema> 
getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
     ClosableIterator<T> blockRecordsIterator;
-    if (keySpecOpt.isPresent()) {
-      KeySpec keySpec = keySpecOpt.get();
-      blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext, 
keySpec.getKeys(), keySpec.isFullKey());
-    } else {
-      blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
+    try {
+      if (keySpecOpt.isPresent()) {
+        KeySpec keySpec = keySpecOpt.get();
+        blockRecordsIterator = 
dataBlock.getEngineRecordIterator(readerContext, keySpec.getKeys(), 
keySpec.isFullKey());
+      } else {
+        blockRecordsIterator = 
dataBlock.getEngineRecordIterator(readerContext);
+      }
+      Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
+      return Pair.of(new CloseableMappingIterator<>(
+          blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), 
schemaTransformerWithEvolvedSchema.getRight());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deser records from log files ", 
e);
     }
-    Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
-    return Pair.of(new CloseableMappingIterator<>(
-        blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), 
schemaTransformerWithEvolvedSchema.getRight());
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 7f361af6ca20..ce4a733428a5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -210,23 +210,35 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeysIterator(
       List<String> sortedKeys, Schema schema) throws IOException {
-    HFileReader reader = readerFactory.createHFileReader();
     ClosableIterator<IndexedRecord> iterator =
-        new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema, 
useBloomFilter);
+        getEngineRecordsByKeysIterator(sortedKeys, schema);
     return new CloseableMappingIterator<>(
         iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
   }
 
+  @Override
+  public ClosableIterator<IndexedRecord> getEngineRecordsByKeysIterator(
+      List<String> sortedKeys, Schema schema) throws IOException {
+    HFileReader reader = readerFactory.createHFileReader();
+    return new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema, 
useBloomFilter);
+  }
+
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeyPrefixIterator(
       List<String> sortedKeyPrefixes, Schema schema) throws IOException {
-    HFileReader reader = readerFactory.createHFileReader();
     ClosableIterator<IndexedRecord> iterator =
-        new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(), 
schema);
+        getEngineRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema);
     return new CloseableMappingIterator<>(
         iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
   }
 
+  @Override
+  public ClosableIterator<IndexedRecord> getEngineRecordsByKeyPrefixIterator(
+      List<String> sortedKeyPrefixes, Schema schema) throws IOException {
+    HFileReader reader = readerFactory.createHFileReader();
+    return new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, 
getSchema(), schema);
+  }
+
   @Override
   public boolean supportKeyPredicate() {
     return true;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
index e7547d2f02cd..5b8dcea0fb02 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
@@ -33,6 +33,10 @@ public interface HoodieSeekingFileReader<T> extends 
HoodieFileReader<T> {
     throw new UnsupportedOperationException();
   }
 
+  default ClosableIterator<T> getEngineRecordsByKeysIterator(List<String> 
sortedKeys, Schema schema) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   default ClosableIterator<HoodieRecord<T>> 
getRecordsByKeysIterator(List<String> sortedKeys) throws IOException {
     return getRecordsByKeysIterator(sortedKeys, getSchema());
   }
@@ -41,6 +45,10 @@ public interface HoodieSeekingFileReader<T> extends 
HoodieFileReader<T> {
     throw new UnsupportedEncodingException();
   }
 
+  default ClosableIterator<T> getEngineRecordsByKeyPrefixIterator(List<String> 
sortedKeyPrefixes, Schema schema) throws IOException {
+    throw new UnsupportedEncodingException();
+  }
+
   default ClosableIterator<HoodieRecord<T>> 
getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes) throws 
IOException {
     return getRecordsByKeyPrefixIterator(sortedKeyPrefixes, getSchema());
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index ee83e7c52999..a20303e6a3bc 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -394,6 +394,15 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
       // no entries should match since this is exact match.
       assertEquals(Collections.emptyList(), recordsByKeys);
 
+      iterator =
+          hfileReader.getEngineRecordsByKeysIterator(keys, avroSchema);
+
+      recordsByKeys =
+          toStream(iterator).map(r -> (GenericRecord) 
r).collect(Collectors.toList());
+
+      // no entries should match since this is exact match.
+      assertEquals(Collections.emptyList(), recordsByKeys);
+
       // filter for "key00001, key05, key12, key24, key2, key31, key49, key61, 
key50". Valid entries should be matched.
       // key00001 should not match.
       // key2 : we don't have an exact match
@@ -414,6 +423,16 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .map(r -> (GenericRecord) r)
               .collect(Collectors.toList());
       assertEquals(expectedKey1s, recordsByKeys);
+
+      iterator =
+          hfileReader.getEngineRecordsByKeysIterator(
+              Arrays.asList("key00001", "key05", "key12", "key24", "key31", 
"key49", "key61", "key50"),
+              avroSchema);
+      recordsByKeys =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expectedKey1s, recordsByKeys);
     }
     try (HoodieAvroHFileReaderImplBase hfileReader = 
getHFileReaderFromFixture(useBloomFilter)) {
       List<GenericRecord> allRecords = 
toStream(hfileReader.getRecordIterator())
@@ -431,6 +450,16 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               || 
(entry.get("_row_key").toString()).equals("key004958-abcdefghij")
               || 
(entry.get("_row_key").toString()).equals("key019889-abcdefghij"))).collect(Collectors.toList());
       assertEquals(expected, actual);
+
+      iterator = hfileReader.getEngineRecordsByKeysIterator(
+          Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa", 
"key001424-aaa",
+              "key004958-abcdefghij", "key010769", "key019889-abcdefghij", 
"key030000"),
+          avroSchema);
+      actual =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expected, actual);
     }
   }
 
@@ -454,6 +483,12 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
 
       assertEquals(allRecords, recordsByPrefix);
 
+      // test getEngineRecordsByKeyPrefix
+      iterator = hfileReader.getEngineRecordsByKeyPrefixIterator(keyPrefixes, 
avroSchema);
+      recordsByPrefix =
+          toStream(iterator).map(r -> (GenericRecord) 
r).collect(Collectors.toList());
+      assertEquals(allRecords, recordsByPrefix);
+
       // filter for "key1" : entries from key10 to key19 should be matched
       List<GenericRecord> expectedKey1s =
           allRecords.stream().filter(entry -> 
(entry.get("_row_key").toString()).contains("key1"))
@@ -468,6 +503,17 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .collect(Collectors.toList());
       assertEquals(expectedKey1s, recordsByPrefix);
 
+      // test getEngineRecordsByKeyPrefix
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key1"),
+              avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED),
+                  false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expectedKey1s, recordsByPrefix);
+
       // exact match
       List<GenericRecord> expectedKey25 =
           allRecords.stream().filter(entry -> 
(entry.get("_row_key").toString()).contains("key25"))
@@ -480,6 +526,14 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .collect(Collectors.toList());
       assertEquals(expectedKey25, recordsByPrefix);
 
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key25"),
 avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expectedKey25, recordsByPrefix);
+
       // no match. key prefix is beyond entries in file.
       iterator =
           
hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key99"),
 avroSchema);
@@ -489,6 +543,14 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .collect(Collectors.toList());
       assertEquals(Collections.emptyList(), recordsByPrefix);
 
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key99"),
 avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(Collections.emptyList(), recordsByPrefix);
+
       // no match. but keyPrefix is in between the entries found in file.
       iterator =
           
hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key1234"),
 avroSchema);
@@ -497,6 +559,13 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .map(r -> (GenericRecord) r)
               .collect(Collectors.toList());
       assertEquals(Collections.emptyList(), recordsByPrefix);
+      iterator =
+          
hfileReader.getEngineRecordsByKeysIterator(Collections.singletonList("key1234"),
 avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(Collections.emptyList(), recordsByPrefix);
 
       // filter for "key1", "key30" and "key60" : entries from 'key10 to 
key19' and 'key30' should be matched.
       List<GenericRecord> expectedKey50and1s =
@@ -510,6 +579,14 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .collect(Collectors.toList());
       assertEquals(expectedKey50and1s, recordsByPrefix);
 
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key30", 
"key6"), avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expectedKey50and1s, recordsByPrefix);
+
       // filter for "key50" and "key0" : entries from key50 and 'key00 to 
key09' should be matched.
       List<GenericRecord> expectedKey50and0s =
           allRecords.stream().filter(entry -> 
(entry.get("_row_key").toString()).contains("key0")
@@ -521,6 +598,13 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               .map(r -> (GenericRecord) r)
               .collect(Collectors.toList());
       assertEquals(expectedKey50and0s, recordsByPrefix);
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key50"), 
avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expectedKey50and0s, recordsByPrefix);
 
       // filter for "key1" and "key0" : entries from 'key10 to key19' and 
'key00 to key09' should be matched.
       List<GenericRecord> expectedKey1sand0s = allRecords.stream()
@@ -541,6 +625,20 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
       });
       assertEquals(expectedKey1sand0s, recordsByPrefix);
 
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key1"), 
avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() {
+        @Override
+        public int compare(GenericRecord o1, GenericRecord o2) {
+          return 
o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString());
+        }
+      });
+      assertEquals(expectedKey1sand0s, recordsByPrefix);
+
       // We expect the keys to be looked up in sorted order. If not, matching 
entries may not be returned.
       // key1 should have matching entries, but not key0.
       iterator =
@@ -556,6 +654,19 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
         }
       });
       assertEquals(expectedKey1s, recordsByPrefix);
+
+      iterator =
+          
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"), 
avroSchema);
+      recordsByPrefix =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() {
+        @Override
+        public int compare(GenericRecord o1, GenericRecord o2) {
+          return 
o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString());
+        }
+      });
     }
     try (HoodieAvroHFileReaderImplBase hfileReader = 
getHFileReaderFromFixture(useBloomFilter)) {
       List<GenericRecord> allRecords = 
toStream(hfileReader.getRecordIterator())
@@ -575,6 +686,17 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
               || (entry.get("_row_key").toString()).contains("key010769")
               || 
(entry.get("_row_key").toString()).contains("key01988"))).collect(Collectors.toList());
       assertEquals(expected, actual);
+
+      iterator = hfileReader.getEngineRecordsByKeyPrefixIterator(
+          Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa", 
"key001424-aaa",
+              "key004958", "key010769", "key01988", "key030000"),
+          avroSchema);
+      actual =
+          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+              .map(r -> (GenericRecord) r)
+              .collect(Collectors.toList());
+      assertEquals(expected, actual);
+
     }
   }
 
@@ -600,6 +722,17 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
                     Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
                 .collect(Collectors.toList()));
       }
+
+      try (ClosableIterator<IndexedRecord> iterator =
+               hfileReader.getEngineRecordsByKeysIterator(
+                   Arrays.asList("key00001", "key05", "key24", "key16", 
"key31", "key61"),
+                   avroSchema)) {
+        assertThrows(
+            IllegalStateException.class,
+            () -> StreamSupport.stream(
+                    Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+                .collect(Collectors.toList()));
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index ede8e3c932e2..4ee8c34c935d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -487,7 +487,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             table.getMetaClient(),
             slice.getLogFiles()
                 .sorted(HoodieLogFile.getLogFileComparator())
-                .map(file -> file.getPath().toString())
                 .collect(Collectors.toList()),
             config.getMaxDFSStreamBufferSize(),
             instant,
@@ -497,7 +496,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             table.getMetaClient(),
             slice.getLogFiles()
                 .sorted(HoodieLogFile.getLogFileComparator())
-                .map(file -> file.getPath().toString())
                 .collect(Collectors.toList()),
             config.getMaxDFSStreamBufferSize(),
             currentInstant,


Reply via email to