This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a31f15d22a21 fix: infer record merge mode for pre-v9 tables in 
generateRequiredSchema (#18106)
a31f15d22a21 is described below

commit a31f15d22a21713b83d11bfa6487b5541c69c792
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Mar 2 20:44:50 2026 +0530

    fix: infer record merge mode for pre-v9 tables in generateRequiredSchema 
(#18106)
---
 .../table/read/FileGroupReaderSchemaHandler.java   | 28 ++++----
 .../read/TestFileGroupReaderSchemaHandler.java     | 78 +++++++++++++++++++++-
 2 files changed, 90 insertions(+), 16 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 723bd2ddeea2..54b0e21fee78 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -175,7 +175,17 @@ public class FileGroupReaderSchemaHandler<T> {
       return requestedSchema;
     }
 
-    if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
+    RecordMergeMode mergeMode = hoodieTableConfig.getRecordMergeMode();
+    if 
(hoodieTableConfig.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
+      Triple<RecordMergeMode, String, String> mergingConfigs = 
inferMergingConfigsForPreV9Table(
+          hoodieTableConfig.getRecordMergeMode(),
+          hoodieTableConfig.getPayloadClass(),
+          hoodieTableConfig.getRecordMergeStrategyId(),
+          hoodieTableConfig.getOrderingFieldsStr().orElse(null),
+          hoodieTableConfig.getTableVersion());
+      mergeMode = mergingConfigs.getLeft();
+    }
+    if (mergeMode == RecordMergeMode.CUSTOM) {
       if (!readerContext.getRecordMerger().get().isProjectionCompatible()) {
         return this.tableSchema;
       }
@@ -184,7 +194,7 @@ public class FileGroupReaderSchemaHandler<T> {
     List<HoodieSchemaField> addedFields = new ArrayList<>();
     for (String field : getMandatoryFieldsForMerging(
         hoodieTableConfig, this.properties, this.tableSchema, 
readerContext.getRecordMerger(),
-        deleteContext.hasBuiltInDeleteField(), 
deleteContext.getCustomDeleteMarkerKeyValue(), hasInstantRange)) {
+        deleteContext.hasBuiltInDeleteField(), 
deleteContext.getCustomDeleteMarkerKeyValue(), hasInstantRange, mergeMode)) {
       if (!findNestedField(requestedSchema, field).isPresent()) {
         addedFields.add(getField(this.tableSchema, field));
       }
@@ -203,18 +213,8 @@ public class FileGroupReaderSchemaHandler<T> {
                                                        
Option<HoodieRecordMerger> recordMerger,
                                                        boolean 
hasBuiltInDelete,
                                                        Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue,
-                                                       boolean 
hasInstantRange) {
-    RecordMergeMode mergeMode = cfg.getRecordMergeMode();
-    if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
-      Triple<RecordMergeMode, String, String> mergingConfigs = 
inferMergingConfigsForPreV9Table(
-          cfg.getRecordMergeMode(),
-          cfg.getPayloadClass(),
-          cfg.getRecordMergeStrategyId(),
-          cfg.getOrderingFieldsStr().orElse(null),
-          cfg.getTableVersion());
-      mergeMode = mergingConfigs.getLeft();
-    }
-
+                                                       boolean hasInstantRange,
+                                                       RecordMergeMode 
mergeMode) {
     if (mergeMode == RecordMergeMode.CUSTOM) {
       return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg, 
props);
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index e2a2b7c34c70..aaa9bf7119c0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -232,7 +233,7 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.of(setPrecombine
 ? preCombineField : StringUtils.EMPTY_STRING));
     when(hoodieTableConfig.getOrderingFields()).thenReturn(setPrecombine ? 
Collections.singletonList(preCombineField) : Collections.emptyList());
     when(hoodieTableConfig.getTableVersion()).thenReturn(tableVersion);
-    if (hoodieTableConfig.getTableVersion() == HoodieTableVersion.SIX) {
+    if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
       if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
         
when(hoodieTableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
       } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
@@ -263,7 +264,12 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     if (addHoodieIsDeleted) {
       expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
     }
-    HoodieSchema expectedSchema = ((mergeMode == RecordMergeMode.CUSTOM) && 
!isProjectionCompatible) ? dataSchema : 
SchemaTestUtil.getSchemaFromFields(expectedFields);
+    // For pre-v9 tables with null mergeMode, the effective merge mode is 
inferred from the payload class
+    RecordMergeMode effectiveMergeMode = mergeMode;
+    if (mergeMode == null && tableVersion.lesserThan(HoodieTableVersion.NINE)) 
{
+      effectiveMergeMode = 
HoodieTableConfig.inferRecordMergeModeFromPayloadClass(hoodieTableConfig.getPayloadClass());
+    }
+    HoodieSchema expectedSchema = ((effectiveMergeMode == 
RecordMergeMode.CUSTOM) && !isProjectionCompatible) ? dataSchema : 
SchemaTestUtil.getSchemaFromFields(expectedFields);
     when(recordMerger.getMandatoryFieldsForMerging(dataSchema, 
hoodieTableConfig, props)).thenReturn(expectedFields.toArray(new String[0]));
 
     DeleteContext deleteContext = new DeleteContext(props, dataSchema);
@@ -276,4 +282,72 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     HoodieSchema actualSchema = 
fileGroupReaderSchemaHandler.generateRequiredSchema(deleteContext);
     assertEquals(expectedSchema, actualSchema);
   }
+
+  private static Stream<Arguments> 
testGenerateRequiredSchemaPreV9CustomPayloadParams() {
+    return Stream.of(
+        // OverwriteNonDefaultsWithLatestAvroPayload → infers CUSTOM merge mode
+        // → merger not projection compatible → full data schema returned
+        Arguments.of(
+            OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+            null,
+            Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "colA", "colB")),
+        // OverwriteWithLatestAvroPayload → infers COMMIT_TIME_ORDERING merge 
mode
+        // → not CUSTOM → requested schema returned as-is
+        Arguments.of(
+            OverwriteWithLatestAvroPayload.class.getName(),
+            null,
+            Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD)),
+        // null payload class + ordering field → infers EVENT_TIME_ORDERING 
merge mode
+        // → ordering field is appended to the requested schema
+        Arguments.of(
+            null,
+            "colA",
+            Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, "colA"))
+    );
+  }
+
+  /**
+   * Tests that for a pre-v9 table (e.g., version 6) where 
getRecordMergeMode() returns null
+   * (because the property didn't exist), generateRequiredSchema correctly 
infers the merge mode
+   * from the payload class and returns the appropriate schema.
+   */
+  @ParameterizedTest
+  @MethodSource("testGenerateRequiredSchemaPreV9CustomPayloadParams")
+  public void testGenerateRequiredSchemaPreV9CustomPayload(String payloadClass,
+                                                           String 
orderingField,
+                                                           List<String> 
expectedFieldNames) {
+    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getInstantRange()).thenReturn(Option.empty());
+    when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+    when(readerContext.getHasLogFiles()).thenReturn(true);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+    when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+    when(hoodieTableConfig.getRecordMergeMode()).thenReturn(null);
+    
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    when(hoodieTableConfig.getPayloadClass()).thenReturn(payloadClass);
+    when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn(null);
+    when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+    if (orderingField != null) {
+      
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.of(orderingField));
+      
when(hoodieTableConfig.getOrderingFields()).thenReturn(Collections.singletonList(orderingField));
+    } else {
+      
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
+    }
+
+    HoodieSchema dataSchema = SchemaTestUtil.getSchemaFromFields(Arrays.asList(
+        HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+        "colA", "colB"));
+    HoodieSchema requestedSchema = SchemaTestUtil.getSchemaFromFields(
+        Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+    HoodieSchema expectedSchema = 
SchemaTestUtil.getSchemaFromFields(expectedFieldNames);
+
+    DeleteContext deleteContext = new DeleteContext(new TypedProperties(), 
dataSchema);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext,
+        dataSchema, requestedSchema, Option.empty(), new TypedProperties(), 
metaClient);
+    HoodieSchema actualSchema = 
schemaHandler.generateRequiredSchema(deleteContext);
+
+    assertEquals(expectedSchema, actualSchema);
+  }
 }

Reply via email to