This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf1a8a483e00 [MINOR] Set prefixed merge properties properly (#13758)
bf1a8a483e00 is described below
commit bf1a8a483e0008d0d9a645803c0ce2092fdae680
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Aug 25 15:26:28 2025 +0800
[MINOR] Set prefixed merge properties properly (#13758)
---
.../src/main/java/org/apache/hudi/index/HoodieIndexUtils.java | 2 +-
.../java/org/apache/hudi/table/action/commit/BaseWriteHelper.java | 5 +++--
.../java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java | 3 +--
.../java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java | 3 +--
.../java/org/apache/hudi/table/action/commit/JavaWriteHelper.java | 3 +--
.../apache/hudi/common/table/read/BufferedRecordMergerFactory.java | 2 +-
6 files changed, 8 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index fac3bea8be07..c90bbcb5172b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -556,7 +556,7 @@ public class HoodieIndexUtils {
orderingFieldNames,
writerSchema.get(),
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(),
hoodieTable.getConfig().getPayloadClass())),
- hoodieTable.getConfig().getProps(),
+ properties,
hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
HoodieData<HoodieRecord<R>> taggedUpdatingRecords =
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 5d7e12217a80..fbe4d1cb3c62 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -115,6 +115,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R>
extends ParallelismHelper<I
recordSchema = new
Schema.Parser().parse(table.getConfig().getWriteSchema());
}
recordSchema = AvroSchemaCache.intern(recordSchema);
+ TypedProperties mergedProperties =
readerContext.getMergeProps(table.getConfig().getProps());
BufferedRecordMerger<T> bufferedRecordMerger =
BufferedRecordMergerFactory.create(
readerContext,
readerContext.getMergeMode(),
@@ -123,14 +124,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R>
extends ParallelismHelper<I
orderingFieldNames,
Option.ofNullable(table.getConfig().getPayloadClass()),
recordSchema,
- table.getConfig().getProps(),
+ mergedProperties,
tableConfig.getPartialUpdateMode());
return deduplicateRecords(
records,
table.getIndex(),
parallelism,
table.getConfig().getSchema(),
- table.getConfig().getProps(),
+ mergedProperties,
bufferedRecordMerger,
readerContext,
orderingFieldNames.toArray(new String[0]));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 94f2bbc83ec3..adfedf72db06 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -65,7 +65,6 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
RecordContext<T> recordContext = readerContext.getRecordContext();
- TypedProperties mergedProperties = readerContext.getMergeProps(props);
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their
partitionPath
@@ -75,7 +74,7 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
// an instance of [[InternalRow]] pointing into shared, mutable
buffer
return Pair.of(key, record.copy());
}).reduceByKey(
- (previous, next) -> reduceRecords(mergedProperties, recordMerger,
orderingFieldNames, previous, next, schema.get(), recordContext),
+ (previous, next) -> reduceRecords(props, recordMerger,
orderingFieldNames, previous, next, schema.get(), recordContext),
parallelism).map(Pair::getRight);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 069730a5ae31..13a30b228fde 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -104,9 +104,8 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, Iterator<HoodieRe
// caution that the avro schema is not serializable
final Schema schema = new Schema.Parser().parse(schemaStr);
- TypedProperties mergedProperties = readerContext.getMergeProps(props);
return keyedRecords.values().stream().map(x ->
x.stream().reduce((previous, next) ->
- reduceRecords(mergedProperties, recordMerger, orderingFieldNames,
previous, next, schema, readerContext.getRecordContext())
+ reduceRecords(props, recordMerger, orderingFieldNames, previous, next,
schema, readerContext.getRecordContext())
).orElse(null)).filter(Objects::nonNull).iterator();
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index c1f81c2e1672..efed521190c9 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -75,9 +75,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T,
List<HoodieRecord<T
}).collect(Collectors.groupingBy(Pair::getLeft));
final Schema schema = new Schema.Parser().parse(schemaStr);
- TypedProperties mergedProperties = readerContext.getMergeProps(props);
return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((previous, next) ->
- reduceRecords(mergedProperties, recordMerger, orderingFieldNames,
previous, next, schema, readerContext.getRecordContext())
+ reduceRecords(props, recordMerger, orderingFieldNames, previous, next,
schema, readerContext.getRecordContext())
).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 5fd50ffcfd00..9e6429ff0d40 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -83,7 +83,7 @@ public class BufferedRecordMergerFactory {
}
// might need to introduce a merge config for the factory in the future to
get rid of this.
- props = readerContext.getMergeProps(props);
+ // props = readerContext.getMergeProps(props);
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
if (partialUpdateModeOpt.isEmpty()) {