This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 39d66455fee [HUDI-9316] Add support for creating iterator of 
HoodieRecord from FGReader (#13314)
39d66455fee is described below

commit 39d66455feea921016b77e4208c5c6ff33a37df0
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 20 11:19:47 2025 -0500

    [HUDI-9316] Add support for creating iterator of HoodieRecord from FGReader 
(#13314)
---
 .../v2/FlinkFileGroupReaderBasedMergeHandle.java   |  4 +-
 ...HoodieSparkFileGroupReaderBasedMergeHandle.java |  5 +-
 .../hudi/BaseSparkInternalRowReaderContext.java    |  5 +-
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |  6 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  5 ++
 .../common/table/read/FileGroupRecordBuffer.java   | 22 +-----
 .../common/table/read/HoodieFileGroupReader.java   | 52 ++++++++++----
 .../table/read/KeyBasedFileGroupRecordBuffer.java  |  7 +-
 .../read/PositionBasedFileGroupRecordBuffer.java   |  7 +-
 .../table/read/UnmergedFileGroupRecordBuffer.java  |  4 +-
 .../table/read/TestFileGroupRecordBuffer.java      | 17 ++---
 .../table/read/TestHoodieFileGroupReaderBase.java  | 83 ++++++++++++++++++----
 .../table/format/FlinkRowDataReaderContext.java    |  2 +-
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  5 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  7 +-
 .../TestPositionBasedFileGroupRecordBuffer.java    | 13 ++--
 16 files changed, 150 insertions(+), 94 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
index 6ac72a5d298..520b5a6de76 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -86,8 +87,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O> 
extends BaseFileGr
         
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
         
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(false).build())
 {
       // Reads the records from the file slice
-      try (HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData> 
recordIterator =
-               (HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData>) 
fileGroupReader.getClosableIterator()) {
+      try (ClosableIterator<RowData> recordIterator = 
(ClosableIterator<RowData>) fileGroupReader.getClosableIterator()) {
         while (recordIterator.hasNext()) {
           // Constructs Flink record for the Flink Parquet file writer
           RowData row = recordIterator.next();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index 5446e6d6660..d604fe68b3f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import 
org.apache.hudi.common.table.read.HoodieFileGroupReader.HoodieFileGroupReaderIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -95,8 +95,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, 
K, O> extends Base
         
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
         
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(usePosition).build())
 {
       // Reads the records from the file slice
-      try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
-               = (HoodieFileGroupReaderIterator<InternalRow>) 
fileGroupReader.getClosableIterator()) {
+      try (ClosableIterator<InternalRow> recordIterator = 
(ClosableIterator<InternalRow>) fileGroupReader.getClosableIterator()) {
         StructType sparkSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(writeSchemaWithMetaFields);
         while (recordIterator.hasNext()) {
           // Constructs Spark record for the Spark Parquet file writer
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 8c26ddf559c..0084fb93b3c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -92,15 +92,16 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
 
   @Override
   public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(
-          new HoodieKey(bufferedRecord.getRecordKey(), null),
+          hoodieKey,
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
     Schema schema = getSchemaFromBufferRecord(bufferedRecord);
     InternalRow row = bufferedRecord.getRecord();
-    return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
+    return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema), false);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 811a7020deb..ef100f1c718 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestMerger;
@@ -125,11 +126,12 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     if (bufferedRecord.isDelete()) {
       return SpillableMapUtils.generateEmptyPayload(
           bufferedRecord.getRecordKey(),
-          null,
+          partitionPath,
           bufferedRecord.getOrderingValue(),
           payloadClass);
     }
-    return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
+    return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c07b9b87e0c..638c58aa4d1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -73,6 +73,7 @@ public abstract class HoodieReaderContext<T> {
   private Boolean hasBootstrapBaseFile = null;
   private Boolean needsBootstrapMerge = null;
   private Boolean shouldMergeUseRecordPosition = null;
+  protected String partitionPath;
 
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
@@ -129,6 +130,10 @@ public abstract class HoodieReaderContext<T> {
     this.hasLogFiles = hasLogFiles;
   }
 
+  public void setPartitionPath(String partitionPath) {
+    this.partitionPath = partitionPath;
+  }
+
   // Getter and Setter for hasBootstrapBaseFile
   public boolean getHasBootstrapBaseFile() {
     return hasBootstrapBaseFile;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index d19ed4a56e3..0f89a6c426a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -35,7 +35,6 @@ import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
@@ -43,7 +42,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -80,8 +78,6 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   protected final HoodieReaderContext<T> readerContext;
   protected final Schema readerSchema;
   protected final Option<String> orderingFieldName;
-  protected final Option<String> partitionNameOverrideOpt;
-  protected final Option<String[]> partitionPathFieldOpt;
   protected final RecordMergeMode recordMergeMode;
   protected final Option<HoodieRecordMerger> recordMerger;
   protected final Option<String> payloadClass;
@@ -101,14 +97,11 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                   HoodieTableMetaClient hoodieTableMetaClient,
                                   RecordMergeMode recordMergeMode,
-                                  Option<String> partitionNameOverrideOpt,
-                                  Option<String[]> partitionPathFieldOpt,
                                   TypedProperties props,
-                                  HoodieReadStats readStats) {
+                                  HoodieReadStats readStats,
+                                  Option<String> orderingFieldName) {
     this.readerContext = readerContext;
     this.readerSchema = 
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
-    this.partitionNameOverrideOpt = partitionNameOverrideOpt;
-    this.partitionPathFieldOpt = partitionPathFieldOpt;
     this.recordMergeMode = recordMergeMode;
     this.recordMerger = readerContext.getRecordMerger();
     if (recordMerger.isPresent() && 
recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID))
 {
@@ -116,16 +109,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     } else {
       this.payloadClass = Option.empty();
     }
-    this.orderingFieldName = recordMergeMode == 
RecordMergeMode.COMMIT_TIME_ORDERING
-        ? Option.empty()
-        : Option.ofNullable(ConfigUtils.getOrderingField(props))
-        .or(() -> {
-          String preCombineField = 
hoodieTableMetaClient.getTableConfig().getPreCombineField();
-          if (StringUtils.isNullOrEmpty(preCombineField)) {
-            return Option.empty();
-          }
-          return Option.of(preCombineField);
-        });
+    this.orderingFieldName = orderingFieldName;
     this.props = props;
     this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
     this.hoodieTableMetaClient = hoodieTableMetaClient;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index f466db8d7cf..ed9e2582716 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -35,9 +36,11 @@ import org.apache.hudi.common.table.PartitionPathParser;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
@@ -77,6 +80,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private final List<HoodieLogFile> logFiles;
   private final String partitionPath;
   private final Option<String[]> partitionPathFields;
+  private final Option<String> orderingFieldName;
   private final HoodieStorage storage;
   private final TypedProperties props;
   // Byte offset to start reading from the base file
@@ -143,6 +147,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
     readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && 
!isSkipMerge);
     readerContext.setHasLogFiles(!this.logFiles.isEmpty());
+    readerContext.setPartitionPath(partitionPath);
     if (readerContext.getHasLogFiles() && start != 0) {
       throw new IllegalArgumentException("Filegroup reader is doing log file 
merge but not reading from the start of the base file");
     }
@@ -151,6 +156,16 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         ? new PositionBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
         : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
+    this.orderingFieldName = recordMergeMode == 
RecordMergeMode.COMMIT_TIME_ORDERING
+        ? Option.empty()
+        : Option.ofNullable(ConfigUtils.getOrderingField(props))
+        .or(() -> {
+          String preCombineField = 
hoodieTableMetaClient.getTableConfig().getPreCombineField();
+          if (StringUtils.isNullOrEmpty(preCombineField)) {
+            return Option.empty();
+          }
+          return Option.of(preCombineField);
+        });
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
         recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
@@ -161,27 +176,26 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   /**
    * Initialize correct record buffer
    */
-  private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext 
readerContext,
-                                                       HoodieTableMetaClient 
hoodieTableMetaClient,
-                                                       RecordMergeMode 
recordMergeMode,
-                                                       TypedProperties props,
-                                                       Option<HoodieBaseFile> 
baseFileOption,
-                                                       boolean hasNoLogFiles,
-                                                       boolean isSkipMerge,
-                                                       boolean 
shouldUseRecordPosition,
-                                                       HoodieReadStats 
readStats) {
+  private FileGroupRecordBuffer<T> getRecordBuffer(HoodieReaderContext<T> 
readerContext,
+                                                   HoodieTableMetaClient 
hoodieTableMetaClient,
+                                                   RecordMergeMode 
recordMergeMode,
+                                                   TypedProperties props,
+                                                   Option<HoodieBaseFile> 
baseFileOption,
+                                                   boolean hasNoLogFiles,
+                                                   boolean isSkipMerge,
+                                                   boolean 
shouldUseRecordPosition,
+                                                   HoodieReadStats readStats) {
     if (hasNoLogFiles) {
       return null;
     } else if (isSkipMerge) {
       return new UnmergedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats);
     } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
       return new PositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(),
-          Option.empty(), baseFileOption.get().getCommitTime(), props, 
readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
     } else {
       return new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
     }
   }
 
@@ -352,11 +366,21 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
-  public HoodieFileGroupReaderIterator<T> getClosableIterator() throws 
IOException {
+  public ClosableIterator<T> getClosableIterator() throws IOException {
     initRecordIterators();
     return new HoodieFileGroupReaderIterator<>(this);
   }
 
+  /**
+   * @return An iterator over the records that wraps the engine-specific 
record in a HoodieRecord.
+   */
+  public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() 
throws IOException {
+    return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> 
{
+      BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(nextRecord, 
readerContext.getSchemaHandler().getRequestedSchema(), readerContext, 
orderingFieldName, false);
+      return readerContext.constructHoodieRecord(bufferedRecord);
+    });
+  }
+
   public static class HoodieFileGroupReaderIterator<T> implements 
ClosableIterator<T> {
     private HoodieFileGroupReader<T> reader;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 7a0e1aacd22..2ccc740fb77 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -51,11 +51,10 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
   public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                        HoodieTableMetaClient 
hoodieTableMetaClient,
                                        RecordMergeMode recordMergeMode,
-                                       Option<String> partitionNameOverrideOpt,
-                                       Option<String[]> partitionPathFieldOpt,
                                        TypedProperties props,
-                                       HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+                                       HoodieReadStats readStats,
+                                       Option<String> orderingFieldName) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 92bf9a98137..5df2ea91e07 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -68,12 +68,11 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
   public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
                                             HoodieTableMetaClient 
hoodieTableMetaClient,
                                             RecordMergeMode recordMergeMode,
-                                            Option<String> 
partitionNameOverrideOpt,
-                                            Option<String[]> 
partitionPathFieldOpt,
                                             String baseFileInstantTime,
                                             TypedProperties props,
-                                            HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+                                            HoodieReadStats readStats,
+                                            Option<String> orderingFieldName) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, orderingFieldName);
     this.baseFileInstantTime = baseFileInstantTime;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index ef686fb7bfc..c853eb088b7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -49,11 +49,9 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
       HoodieReaderContext<T> readerContext,
       HoodieTableMetaClient hoodieTableMetaClient,
       RecordMergeMode recordMergeMode,
-      Option<String> partitionNameOverrideOpt,
-      Option<String[]> partitionPathFieldOpt,
       TypedProperties props,
       HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, 
readStats, Option.empty());
     this.currentInstantLogBlocks = new ArrayDeque<>();
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index daac251d187..06552a9391c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -84,8 +84,6 @@ class TestFileGroupRecordBuffer {
   private final FileGroupReaderSchemaHandler schemaHandler =
       mock(FileGroupReaderSchemaHandler.class);
   private HoodieTableMetaClient hoodieTableMetaClient = 
mock(HoodieTableMetaClient.class);
-  private Option<String> partitionNameOverrideOpt = Option.empty();
-  private Option<String[]> partitionPathFieldOpt = Option.empty();
   private TypedProperties props = new TypedProperties();
   private HoodieReadStats readStats = mock(HoodieReadStats.class);
 
@@ -290,10 +288,9 @@ class TestFileGroupRecordBuffer {
             readerContext,
             hoodieTableMetaClient,
             RecordMergeMode.COMMIT_TIME_ORDERING,
-            partitionNameOverrideOpt,
-            partitionPathFieldOpt,
             props,
-            readStats);
+            readStats,
+            Option.empty());
     when(readerContext.getValue(any(), any(), any())).thenReturn(null);
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
 
@@ -303,10 +300,9 @@ class TestFileGroupRecordBuffer {
             readerContext,
             hoodieTableMetaClient,
             RecordMergeMode.COMMIT_TIME_ORDERING,
-            partitionNameOverrideOpt,
-            partitionPathFieldOpt,
             props,
-            readStats);
+            readStats,
+            Option.empty());
     when(readerContext.getValue(any(), any(), any())).thenReturn("i");
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
     when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -325,10 +321,9 @@ class TestFileGroupRecordBuffer {
             readerContext,
             hoodieTableMetaClient,
             RecordMergeMode.COMMIT_TIME_ORDERING,
-            partitionNameOverrideOpt,
-            partitionPathFieldOpt,
             props,
-            readStats);
+            readStats,
+            Option.empty());
 
     // CASE 1: With custom delete marker.
     GenericRecord record = new GenericData.Record(schema);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 0890e574920..7c8d7527cb0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -68,6 +68,7 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -318,6 +319,13 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     // validate size is equivalent to ensure no duplicates are returned
     assertEquals(expectedRecords.size(), actualRecordList.size());
     assertEquals(new HashSet<>(expectedRecords), new 
HashSet<>(actualRecordList));
+    // validate records can be read from file group as HoodieRecords
+    actualRecordList = convertHoodieRecords(
+        readHoodieRecordsFromFileGroup(storageConf, tablePath, metaClient, 
fileSlices, avroSchema, recordMergeMode),
+        avroSchema, readerContext);
+    assertEquals(expectedRecords.size(), actualRecordList.size());
+    assertEquals(new HashSet<>(expectedRecords), new 
HashSet<>(actualRecordList));
+    // validate unmerged records
     actualRecordList = convertEngineRecords(
         readRecordsFromFileGroup(storageConf, tablePath, metaClient, 
fileSlices, avroSchema, recordMergeMode, true),
         avroSchema, readerContext);
@@ -372,21 +380,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                            boolean isSkipMerge) {
 
     List<T> actualRecordList = new ArrayList<>();
-    TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.datasource.write.precombine.field", 
PRECOMBINE_FIELD_NAME);
-    props.setProperty("hoodie.payload.ordering.field", PRECOMBINE_FIELD_NAME);
-    props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
-    if (recordMergeMode.equals(RecordMergeMode.CUSTOM)) {
-      props.setProperty(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
-      props.setProperty(PAYLOAD_CLASS_NAME.key(), getCustomPayload());
-    }
-    props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
-    props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
metaClient.getTempFolderPath());
-    props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
-    
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
-    if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
-      props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
-    }
+    TypedProperties props = buildProperties(metaClient, recordMergeMode);
     if (isSkipMerge) {
       props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(), 
HoodieReaderConfig.REALTIME_SKIP_MERGE);
     }
@@ -431,6 +425,47 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
   }
 
+  private List<HoodieRecord<T>> 
readHoodieRecordsFromFileGroup(StorageConfiguration<?> storageConf,
+                                                               String 
tablePath,
+                                                               
HoodieTableMetaClient metaClient,
+                                                               List<FileSlice> 
fileSlices,
+                                                               Schema 
avroSchema,
+                                                               RecordMergeMode 
recordMergeMode) {
+
+    List<HoodieRecord<T>> actualRecordList = new ArrayList<>();
+    TypedProperties props = buildProperties(metaClient, recordMergeMode);
+    fileSlices.forEach(fileSlice -> {
+      try (HoodieFileGroupReader<T> fileGroupReader = 
getHoodieFileGroupReader(storageConf, tablePath, metaClient, avroSchema, 
fileSlice, 0, props);
+           ClosableIterator<HoodieRecord<T>> iter = 
fileGroupReader.getClosableHoodieRecordIterator()) {
+        while (iter.hasNext()) {
+          actualRecordList.add(iter.next());
+        }
+      } catch (IOException ex) {
+        throw new UncheckedIOException(ex);
+      }
+    });
+    return actualRecordList;
+  }
+
+  private TypedProperties buildProperties(HoodieTableMetaClient metaClient, 
RecordMergeMode recordMergeMode) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.precombine.field", 
PRECOMBINE_FIELD_NAME);
+    props.setProperty("hoodie.payload.ordering.field", PRECOMBINE_FIELD_NAME);
+    props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+    if (recordMergeMode.equals(RecordMergeMode.CUSTOM)) {
+      props.setProperty(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      props.setProperty(PAYLOAD_CLASS_NAME.key(), getCustomPayload());
+    }
+    props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+    props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
metaClient.getTempFolderPath());
+    props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+    
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
+    if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
+      props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
+    }
+    return props;
+  }
+
   private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema 
requestedSchema) {
     if (fileSlice.getLogFiles().findAny().isPresent()) {
       return true;
@@ -466,6 +501,24 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         .collect(Collectors.toList());
   }
 
+  private List<HoodieTestDataGenerator.RecordIdentifier> 
convertHoodieRecords(List<HoodieRecord<T>> records, Schema schema, 
HoodieReaderContext<T> readerContext) {
+    return records.stream()
+        .map(record -> new HoodieTestDataGenerator.RecordIdentifier(
+            record.getRecordKey(),
+            removeHiveStylePartition(record.getPartitionPath()),
+            record.getOrderingValue(schema, new TypedProperties()).toString(),
+            readerContext.getValue(record.getData(), schema, 
RIDER_FIELD_NAME).toString()))
+        .collect(Collectors.toList());
+  }
+
+  private static String removeHiveStylePartition(String partitionPath) {
+    int indexOf = partitionPath.indexOf("=");
+    if (indexOf > 0) {
+      return partitionPath.substring(indexOf + 1);
+    }
+    return partitionPath;
+  }
+
   private void extract(Path target) throws IOException {
     try (ZipInputStream zip = new 
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
 {
       ZipEntry entry;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 79077c888b7..808860478b3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -146,7 +146,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
 
   @Override
   public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> 
bufferedRecord) {
-    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     // delete record
     if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, 
bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.FLINK);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index e5440c4e5c5..fa051360b3f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -214,14 +214,15 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
 
   @Override
   public HoodieRecord<ArrayWritable> 
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+    HoodieKey key = new HoodieKey(bufferedRecord.getRecordKey(), 
partitionPath);
     if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(
-          new HoodieKey(bufferedRecord.getRecordKey(), null),
+          key,
           HoodieRecord.HoodieRecordType.HIVE);
     }
     Schema schema = getSchemaFromBufferRecord(bufferedRecord);
     ArrayWritable writable = bufferedRecord.getRecord();
-    return new HoodieHiveRecord(new HoodieKey(bufferedRecord.getRecordKey(), 
null), writable, schema, objectInspectorCache);
+    return new HoodieHiveRecord(key, writable, schema, objectInspectorCache);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index fc9d0bcb208..afab0f391bc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig, 
TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.common.util.collection.ClosableIterator
 import org.apache.hudi.data.CloseableIteratorListener
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.io.IOUtils
@@ -250,7 +251,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
       props)
   }
 
-  private def appendPartitionAndProject(iter: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+  private def appendPartitionAndProject(iter: ClosableIterator[InternalRow],
                                         inputSchema: StructType,
                                         partitionSchema: StructType,
                                         to: StructType,
@@ -273,14 +274,14 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
     }
   }
 
-  private def projectSchema(iter: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+  private def projectSchema(iter: ClosableIterator[InternalRow],
                             from: StructType,
                             to: StructType): Iterator[InternalRow] = {
     val unsafeProjection = generateUnsafeProjection(from, to)
     makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
   }
 
-  private def 
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+  private def 
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: 
ClosableIterator[InternalRow],
                                                           mappingFunction: 
Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
     CloseableIteratorListener.addListener(closeableFileGroupRecordIterator)
     new Iterator[InternalRow] with Closeable {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index e0a59b563c8..47e7f42d12d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -40,7 +40,6 @@ import 
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -82,7 +81,7 @@ public class TestPositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupR
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
     writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
     writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
-    
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)
 ? "" : "timestamp");
+    writeConfigs.put("hoodie.datasource.write.precombine.field", 
mergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING) ? "" : "timestamp");
     writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
     writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
     writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
@@ -106,11 +105,8 @@ public class TestPositionBasedFileGroupRecordBuffer 
extends TestHoodieFileGroupR
 
     metaClient = createMetaClient(getStorageConf(), getBasePath());
     avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
-    Option<String[]> partitionFields = 
metaClient.getTableConfig().getPartitionFields();
-    Option<String> partitionNameOpt = 
StringUtils.isNullOrEmpty(partitionPaths[0])
-        ? Option.empty() : Option.of(partitionPaths[0]);
 
-    HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(), 
avroSchema, getStorageConf(), metaClient);
+    HoodieReaderContext<InternalRow> ctx = 
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
     ctx.setTablePath(getBasePath());
     ctx.setLatestCommitTime(metaClient.createNewInstantTime());
     ctx.setShouldMergeUseRecordPosition(true);
@@ -139,11 +135,10 @@ public class TestPositionBasedFileGroupRecordBuffer 
extends TestHoodieFileGroupR
         ctx,
         metaClient,
         mergeMode,
-        partitionNameOpt,
-        partitionFields,
         baseFileInstantTime,
         props,
-        readStats);
+        readStats,
+        Option.of("timestamp"));
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean 
shouldWriteRecordPositions,


Reply via email to