This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a31f15d22a21 fix: infer record merge mode for pre-v9 tables in
generateRequiredSchema (#18106)
a31f15d22a21 is described below
commit a31f15d22a21713b83d11bfa6487b5541c69c792
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Mar 2 20:44:50 2026 +0530
fix: infer record merge mode for pre-v9 tables in generateRequiredSchema
(#18106)
---
.../table/read/FileGroupReaderSchemaHandler.java | 28 ++++----
.../read/TestFileGroupReaderSchemaHandler.java | 78 +++++++++++++++++++++-
2 files changed, 90 insertions(+), 16 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 723bd2ddeea2..54b0e21fee78 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -175,7 +175,17 @@ public class FileGroupReaderSchemaHandler<T> {
return requestedSchema;
}
- if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
+ RecordMergeMode mergeMode = hoodieTableConfig.getRecordMergeMode();
+ if
(hoodieTableConfig.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
+ Triple<RecordMergeMode, String, String> mergingConfigs =
inferMergingConfigsForPreV9Table(
+ hoodieTableConfig.getRecordMergeMode(),
+ hoodieTableConfig.getPayloadClass(),
+ hoodieTableConfig.getRecordMergeStrategyId(),
+ hoodieTableConfig.getOrderingFieldsStr().orElse(null),
+ hoodieTableConfig.getTableVersion());
+ mergeMode = mergingConfigs.getLeft();
+ }
+ if (mergeMode == RecordMergeMode.CUSTOM) {
if (!readerContext.getRecordMerger().get().isProjectionCompatible()) {
return this.tableSchema;
}
@@ -184,7 +194,7 @@ public class FileGroupReaderSchemaHandler<T> {
List<HoodieSchemaField> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(
hoodieTableConfig, this.properties, this.tableSchema,
readerContext.getRecordMerger(),
- deleteContext.hasBuiltInDeleteField(),
deleteContext.getCustomDeleteMarkerKeyValue(), hasInstantRange)) {
+ deleteContext.hasBuiltInDeleteField(),
deleteContext.getCustomDeleteMarkerKeyValue(), hasInstantRange, mergeMode)) {
if (!findNestedField(requestedSchema, field).isPresent()) {
addedFields.add(getField(this.tableSchema, field));
}
@@ -203,18 +213,8 @@ public class FileGroupReaderSchemaHandler<T> {
Option<HoodieRecordMerger> recordMerger,
boolean
hasBuiltInDelete,
Option<Pair<String,
String>> customDeleteMarkerKeyAndValue,
- boolean
hasInstantRange) {
- RecordMergeMode mergeMode = cfg.getRecordMergeMode();
- if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
- Triple<RecordMergeMode, String, String> mergingConfigs =
inferMergingConfigsForPreV9Table(
- cfg.getRecordMergeMode(),
- cfg.getPayloadClass(),
- cfg.getRecordMergeStrategyId(),
- cfg.getOrderingFieldsStr().orElse(null),
- cfg.getTableVersion());
- mergeMode = mergingConfigs.getLeft();
- }
-
+ boolean hasInstantRange,
+ RecordMergeMode
mergeMode) {
if (mergeMode == RecordMergeMode.CUSTOM) {
return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg,
props);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index e2a2b7c34c70..aaa9bf7119c0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -232,7 +233,7 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.of(setPrecombine
? preCombineField : StringUtils.EMPTY_STRING));
when(hoodieTableConfig.getOrderingFields()).thenReturn(setPrecombine ?
Collections.singletonList(preCombineField) : Collections.emptyList());
when(hoodieTableConfig.getTableVersion()).thenReturn(tableVersion);
- if (hoodieTableConfig.getTableVersion() == HoodieTableVersion.SIX) {
+ if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
when(hoodieTableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
} else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
@@ -263,7 +264,12 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
if (addHoodieIsDeleted) {
expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
}
- HoodieSchema expectedSchema = ((mergeMode == RecordMergeMode.CUSTOM) &&
!isProjectionCompatible) ? dataSchema :
SchemaTestUtil.getSchemaFromFields(expectedFields);
+ // For pre-v9 tables with null mergeMode, the effective merge mode is
inferred from the payload class
+ RecordMergeMode effectiveMergeMode = mergeMode;
+ if (mergeMode == null && tableVersion.lesserThan(HoodieTableVersion.NINE))
{
+ effectiveMergeMode =
HoodieTableConfig.inferRecordMergeModeFromPayloadClass(hoodieTableConfig.getPayloadClass());
+ }
+ HoodieSchema expectedSchema = ((effectiveMergeMode ==
RecordMergeMode.CUSTOM) && !isProjectionCompatible) ? dataSchema :
SchemaTestUtil.getSchemaFromFields(expectedFields);
when(recordMerger.getMandatoryFieldsForMerging(dataSchema,
hoodieTableConfig, props)).thenReturn(expectedFields.toArray(new String[0]));
DeleteContext deleteContext = new DeleteContext(props, dataSchema);
@@ -276,4 +282,72 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
HoodieSchema actualSchema =
fileGroupReaderSchemaHandler.generateRequiredSchema(deleteContext);
assertEquals(expectedSchema, actualSchema);
}
+
+ private static Stream<Arguments>
testGenerateRequiredSchemaPreV9CustomPayloadParams() {
+ return Stream.of(
+ // OverwriteNonDefaultsWithLatestAvroPayload → infers CUSTOM merge mode
+ // → merger not projection compatible → full data schema returned
+ Arguments.of(
+ OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+ null,
+ Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "colA", "colB")),
+ // OverwriteWithLatestAvroPayload → infers COMMIT_TIME_ORDERING merge
mode
+ // → not CUSTOM → requested schema returned as-is
+ Arguments.of(
+ OverwriteWithLatestAvroPayload.class.getName(),
+ null,
+ Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD)),
+ // null payload class + ordering field → infers EVENT_TIME_ORDERING
merge mode
+ // → ordering field is appended to the requested schema
+ Arguments.of(
+ null,
+ "colA",
+ Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, "colA"))
+ );
+ }
+
+ /**
+ * Tests that for a pre-v9 table (e.g., version 6) where
getRecordMergeMode() returns null
+ * (because the property didn't exist), generateRequiredSchema correctly
infers the merge mode
+ * from the payload class and returns the appropriate schema.
+ */
+ @ParameterizedTest
+ @MethodSource("testGenerateRequiredSchemaPreV9CustomPayloadParams")
+ public void testGenerateRequiredSchemaPreV9CustomPayload(String payloadClass,
+ String
orderingField,
+ List<String>
expectedFieldNames) {
+ HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getInstantRange()).thenReturn(Option.empty());
+ when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+ when(readerContext.getHasLogFiles()).thenReturn(true);
+ HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+ when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+ when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+ when(hoodieTableConfig.getRecordMergeMode()).thenReturn(null);
+
when(hoodieTableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+ when(hoodieTableConfig.getPayloadClass()).thenReturn(payloadClass);
+ when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn(null);
+ when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+ if (orderingField != null) {
+
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.of(orderingField));
+
when(hoodieTableConfig.getOrderingFields()).thenReturn(Collections.singletonList(orderingField));
+ } else {
+
when(hoodieTableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
+ }
+
+ HoodieSchema dataSchema = SchemaTestUtil.getSchemaFromFields(Arrays.asList(
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ "colA", "colB"));
+ HoodieSchema requestedSchema = SchemaTestUtil.getSchemaFromFields(
+ Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ HoodieSchema expectedSchema =
SchemaTestUtil.getSchemaFromFields(expectedFieldNames);
+
+ DeleteContext deleteContext = new DeleteContext(new TypedProperties(),
dataSchema);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext,
+ dataSchema, requestedSchema, Option.empty(), new TypedProperties(),
metaClient);
+ HoodieSchema actualSchema =
schemaHandler.generateRequiredSchema(deleteContext);
+
+ assertEquals(expectedSchema, actualSchema);
+ }
}