This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb921591562 [HUDI-5938] No need to init the properties for each data
flushing (#8192)
eb921591562 is described below
commit eb921591562ae178031c406b48489c30fa587d06
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 16 11:06:06 2023 +0800
[HUDI-5938] No need to init the properties for each data flushing (#8192)
Introduced by #7345
---
.../main/java/org/apache/hudi/sink/StreamWriteFunction.java | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7f0285e83ed..c37b2325ca7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -53,7 +52,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Properties;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -431,10 +429,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has
no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- Properties props = new Properties();
- config.addAllToProperties(props);
records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
- .deduplicateRecords(records, (HoodieIndex) null, -1,
this.writeClient.getConfig().getSchema(), props, recordMerger);
+ .deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
}
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, instant));
@@ -469,10 +465,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- Properties props = new Properties();
- config.addAllToProperties(props);
records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
- .deduplicateRecords(records, (HoodieIndex) null, -1,
this.writeClient.getConfig().getSchema(), props, recordMerger);
+ .deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));