wangxianghu commented on a change in pull request #2547:
URL: https://github.com/apache/hudi/pull/2547#discussion_r571849816



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -77,7 +79,11 @@ public void open(Configuration parameters) throws Exception {
     writeParallelSize = 
getRuntimeContext().getExecutionConfig().getParallelism();
 
     // writeClient
-    writeClient = new HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), 
StreamerUtil.getHoodieClientConfig(cfg));

Review comment:
       > You may see my change files. It obvious that the original version did 
not invokes `FlinkOptions.fromStreamerConfig(conf)` firstly.
   
   @ZhangChaoming Did you update your local code? It seems ok on my side
   ```
     public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig 
conf) {
       return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
     }
   
     public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
       HoodieWriteConfig.Builder builder =
           HoodieWriteConfig.newBuilder()
               .withEngineType(EngineType.FLINK)
               .withPath(conf.getString(FlinkOptions.PATH))
               .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), 
true)
               .withCompactionConfig(
                   HoodieCompactionConfig.newBuilder()
                       
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
                       .build())
               .forTable(conf.getString(FlinkOptions.TABLE_NAME))
               .withAutoCommit(false)
               
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
   
       builder = builder.withSchema(getSourceSchema(conf).toString());
       return builder.build();
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to