[ 
https://issues.apache.org/jira/browse/HUDI-1596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

张超明 updated HUDI-1596:
----------------------
    Summary: Throw NPE while reading configuration of parameter 'path'.  (was: 
NPE while reading configuration of parameter 'path'.)

> Throw NPE while reading configuration of parameter 'path'.
> ----------------------------------------------------------
>
>                 Key: HUDI-1596
>                 URL: https://issues.apache.org/jira/browse/HUDI-1596
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Flink Integration
>    Affects Versions: 0.8.0
>            Reporter: 张超明
>            Priority: Blocker
>
> h2. *Problem*
> Program throws NPE during initilaize writeClient.
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = 
> getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient
>   writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new 
> FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
> }
> {code}
> The `cfg` does not have option 'path', which cause NPE. Exception occured on 
> ` .withPath(conf.getString(FlinkOptions.PATH))`.
> {code:java}
> public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
>   HoodieWriteConfig.Builder builder =
>       HoodieWriteConfig.newBuilder()
>           .withEngineType(EngineType.FLINK)
>           .withPath(conf.getString(FlinkOptions.PATH))
>           .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
>           .withCompactionConfig(
>               HoodieCompactionConfig.newBuilder()
>                   
> .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
>                   .build())
>           .forTable(conf.getString(FlinkOptions.TABLE_NAME))
>           .withAutoCommit(false)
>           
> .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
>   builder = builder.withSchema(getSourceSchema(conf).toString());
>   return builder.build();
> }{code}
> h2. *Solution*
> Modify `open()` as below:
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = 
> getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient 
>   final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes 
> here
>   final HoodieWriteConfig hoodieClientConfig = 
> StreamerUtil.getHoodieClientConfig(conf);
>   final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new 
> FlinkTaskContextSupplier(null));
>   writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to