This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf1a8a483e00 [MINOR] Set prefixed merge properties properly (#13758)
bf1a8a483e00 is described below

commit bf1a8a483e0008d0d9a645803c0ce2092fdae680
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Aug 25 15:26:28 2025 +0800

    [MINOR] Set prefixed merge properties properly (#13758)
---
 .../src/main/java/org/apache/hudi/index/HoodieIndexUtils.java        | 2 +-
 .../java/org/apache/hudi/table/action/commit/BaseWriteHelper.java    | 5 +++--
 .../java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java  | 3 +--
 .../java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java   | 3 +--
 .../java/org/apache/hudi/table/action/commit/JavaWriteHelper.java    | 3 +--
 .../apache/hudi/common/table/read/BufferedRecordMergerFactory.java   | 2 +-
 6 files changed, 8 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index fac3bea8be07..c90bbcb5172b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -556,7 +556,7 @@ public class HoodieIndexUtils {
         orderingFieldNames,
         writerSchema.get(),
         
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(),
 hoodieTable.getConfig().getPayloadClass())),
-        hoodieTable.getConfig().getProps(),
+        properties,
         hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
     String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
     HoodieData<HoodieRecord<R>> taggedUpdatingRecords = 
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 5d7e12217a80..fbe4d1cb3c62 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -115,6 +115,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R> 
extends ParallelismHelper<I
       recordSchema = new 
Schema.Parser().parse(table.getConfig().getWriteSchema());
     }
     recordSchema = AvroSchemaCache.intern(recordSchema);
+    TypedProperties mergedProperties = 
readerContext.getMergeProps(table.getConfig().getProps());
     BufferedRecordMerger<T> bufferedRecordMerger = 
BufferedRecordMergerFactory.create(
         readerContext,
         readerContext.getMergeMode(),
@@ -123,14 +124,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R> 
extends ParallelismHelper<I
         orderingFieldNames,
         Option.ofNullable(table.getConfig().getPayloadClass()),
         recordSchema,
-        table.getConfig().getProps(),
+        mergedProperties,
         tableConfig.getPartialUpdateMode());
     return deduplicateRecords(
         records,
         table.getIndex(),
         parallelism,
         table.getConfig().getSchema(),
-        table.getConfig().getProps(),
+        mergedProperties,
         bufferedRecordMerger,
         readerContext,
         orderingFieldNames.toArray(new String[0]));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 94f2bbc83ec3..adfedf72db06 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -65,7 +65,6 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
     RecordContext<T> recordContext = readerContext.getRecordContext();
-    TypedProperties mergedProperties = readerContext.getMergeProps(props);
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their 
partitionPath
@@ -75,7 +74,7 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       //       an instance of [[InternalRow]] pointing into shared, mutable 
buffer
       return Pair.of(key, record.copy());
     }).reduceByKey(
-        (previous, next) -> reduceRecords(mergedProperties, recordMerger, 
orderingFieldNames, previous, next, schema.get(), recordContext),
+        (previous, next) -> reduceRecords(props, recordMerger, 
orderingFieldNames, previous, next, schema.get(), recordContext),
         parallelism).map(Pair::getRight);
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 069730a5ae31..13a30b228fde 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -104,9 +104,8 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, Iterator<HoodieRe
 
     // caution that the avro schema is not serializable
     final Schema schema = new Schema.Parser().parse(schemaStr);
-    TypedProperties mergedProperties = readerContext.getMergeProps(props);
     return keyedRecords.values().stream().map(x -> 
x.stream().reduce((previous, next) ->
-      reduceRecords(mergedProperties, recordMerger, orderingFieldNames, 
previous, next, schema, readerContext.getRecordContext())
+      reduceRecords(props, recordMerger, orderingFieldNames, previous, next, 
schema, readerContext.getRecordContext())
     ).orElse(null)).filter(Objects::nonNull).iterator();
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index c1f81c2e1672..efed521190c9 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -75,9 +75,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
     }).collect(Collectors.groupingBy(Pair::getLeft));
 
     final Schema schema = new Schema.Parser().parse(schemaStr);
-    TypedProperties mergedProperties = readerContext.getMergeProps(props);
     return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((previous, next) ->
-        reduceRecords(mergedProperties, recordMerger, orderingFieldNames, 
previous, next, schema, readerContext.getRecordContext())
+        reduceRecords(props, recordMerger, orderingFieldNames, previous, next, 
schema, readerContext.getRecordContext())
     ).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 5fd50ffcfd00..9e6429ff0d40 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -83,7 +83,7 @@ public class BufferedRecordMergerFactory {
     }
 
     // might need to introduce a merge config for the factory in the future to 
get rid of this.
-    props = readerContext.getMergeProps(props);
+    // props = readerContext.getMergeProps(props);
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {

Reply via email to