wangxianghu commented on a change in pull request #2547:
URL: https://github.com/apache/hudi/pull/2547#discussion_r571849816
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -77,7 +79,11 @@ public void open(Configuration parameters) throws Exception {
writeParallelSize =
getRuntimeContext().getExecutionConfig().getParallelism();
// writeClient
- writeClient = new HoodieFlinkWriteClient<>(new
HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
StreamerUtil.getHoodieClientConfig(cfg));
Review comment:
> You may see my change files. It obvious that the original version did
not invokes `FlinkOptions.fromStreamerConfig(conf)` firstly.
@ZhangChaoming Did you update your local code to the same as master? It
seems ok in my side
```
public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig
conf) {
return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS),
true)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
.build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
builder = builder.withSchema(getSourceSchema(conf).toString());
return builder.build();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]