This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb921591562 [HUDI-5938] No need to init the properties for each data 
flushing (#8192)
eb921591562 is described below

commit eb921591562ae178031c406b48489c30fa587d06
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 16 11:06:06 2023 +0800

    [HUDI-5938] No need to init the properties for each data flushing (#8192)
    
    Introduced by #7345
---
 .../main/java/org/apache/hudi/sink/StreamWriteFunction.java    | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7f0285e83ed..c37b2325ca7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -53,7 +52,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Properties;
 import java.util.Random;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -431,10 +429,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-      Properties props = new Properties();
-      config.addAllToProperties(props);
       records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
-          .deduplicateRecords(records, (HoodieIndex) null, -1, 
this.writeClient.getConfig().getSchema(), props, recordMerger);
+          .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
@@ -469,10 +465,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
             List<HoodieRecord> records = bucket.writeBuffer();
             if (records.size() > 0) {
               if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-                Properties props = new Properties();
-                config.addAllToProperties(props);
                 records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
-                    .deduplicateRecords(records, (HoodieIndex) null, -1, 
this.writeClient.getConfig().getSchema(), props, recordMerger);
+                    .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
               }
               bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));

Reply via email to