This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d0a4152cf [HUDI-4611] Fix the duplicate creation of config in
HoodieFlinkStreamer (#6369)
0d0a4152cf is described below
commit 0d0a4152cfd362185066519ae926ac4513c7a152
Author: feiyang_deepnova <[email protected]>
AuthorDate: Fri Aug 12 11:24:56 2022 +0800
[HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer
(#6369)
Co-authored-by: linfey <[email protected]>
---
.../src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 4 ++--
.../hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 4 ----
2 files changed, 2 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 013753b6d9..29f55f78ac 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -69,12 +69,12 @@ public class HoodieFlinkStreamer {
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
+ Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
// Read from kafka source
RowType rowType =
- (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+ (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
- Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e5e30c1c98..4b93faeaf7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -116,10 +116,6 @@ public class StreamerUtil {
new Path(cfg.propsFilePath), cfg.configs).getProps();
}
- public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
- return new
FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
- }
-
public static Schema
getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceSchema();