[ 
https://issues.apache.org/jira/browse/HUDI-1596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

张超明 updated HUDI-1596:
----------------------
    Description: 
h2. *Problem*

Program throws NPE during initilaize writeClient.
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient
  writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new 
FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
{code}
The `cfg` does not have option 'PATH', which causes NPE. Exception occured on 
`.withPath(conf.getString(FlinkOptions.PATH))`.
{code:java}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
  HoodieWriteConfig.Builder builder =
      HoodieWriteConfig.newBuilder()
          .withEngineType(EngineType.FLINK)
          .withPath(conf.getString(FlinkOptions.PATH))
          .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
          .withCompactionConfig(
              HoodieCompactionConfig.newBuilder()
                  .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
                  .build())
          .forTable(conf.getString(FlinkOptions.TABLE_NAME))
          .withAutoCommit(false)
          .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));

  builder = builder.withSchema(getSourceSchema(conf).toString());
  return builder.build();
}{code}
On the following step, the `basePath` is `null`. More importantly, the 
Properties(`props`)  extends HashTable, which will throw NPE when recieve a 
`null` value.
{code:java}
public Builder withPath(String basePath) {
  props.setProperty(BASE_PATH_PROP, basePath);
  return this;
}
{code}
h2. *Solution*

Modify `open()` as below:
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient 
  final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes 
here
  final HoodieWriteConfig hoodieClientConfig = 
StreamerUtil.getHoodieClientConfig(conf);
  final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new 
FlinkTaskContextSupplier(null));

  writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
}
{code}

  was:
h2. *Problem*

Program throws NPE during initilaize writeClient.
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient
  writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new 
FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
{code}
The `cfg` does not have option 'PATH', which causes NPE. Exception occured on 
`.withPath(conf.getString(FlinkOptions.PATH))`.
{code:java}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
  HoodieWriteConfig.Builder builder =
      HoodieWriteConfig.newBuilder()
          .withEngineType(EngineType.FLINK)
          .withPath(conf.getString(FlinkOptions.PATH))
          .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
          .withCompactionConfig(
              HoodieCompactionConfig.newBuilder()
                  .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
                  .build())
          .forTable(conf.getString(FlinkOptions.TABLE_NAME))
          .withAutoCommit(false)
          .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));

  builder = builder.withSchema(getSourceSchema(conf).toString());
  return builder.build();
}{code}
On the following step, the basePath is `null`. More importantly, the 
Properties(`props`)  extends HashTable, which will throw NPE when recieve a 
`null` value.
{code:java}
public Builder withPath(String basePath) {
  props.setProperty(BASE_PATH_PROP, basePath);
  return this;
}
{code}
h2. *Solution*

Modify `open()` as below:
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient 
  final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes 
here
  final HoodieWriteConfig hoodieClientConfig = 
StreamerUtil.getHoodieClientConfig(conf);
  final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new 
FlinkTaskContextSupplier(null));

  writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
}
{code}


> Throw NPE while reading configuration of parameter 'PATH'.
> ----------------------------------------------------------
>
>                 Key: HUDI-1596
>                 URL: https://issues.apache.org/jira/browse/HUDI-1596
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Flink Integration
>    Affects Versions: 0.8.0
>            Reporter: 张超明
>            Priority: Blocker
>              Labels: pull-request-available
>
> h2. *Problem*
> Program throws NPE during initilaize writeClient.
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = 
> getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient
>   writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new 
> FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
> }
> {code}
> The `cfg` does not have option 'PATH', which causes NPE. Exception occured on 
> `.withPath(conf.getString(FlinkOptions.PATH))`.
> {code:java}
> public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
>   HoodieWriteConfig.Builder builder =
>       HoodieWriteConfig.newBuilder()
>           .withEngineType(EngineType.FLINK)
>           .withPath(conf.getString(FlinkOptions.PATH))
>           .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
>           .withCompactionConfig(
>               HoodieCompactionConfig.newBuilder()
>                   
> .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
>                   .build())
>           .forTable(conf.getString(FlinkOptions.TABLE_NAME))
>           .withAutoCommit(false)
>           
> .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
>   builder = builder.withSchema(getSourceSchema(conf).toString());
>   return builder.build();
> }{code}
> On the following step, the `basePath` is `null`. More importantly, the 
> Properties(`props`)  extends HashTable, which will throw NPE when recieve a 
> `null` value.
> {code:java}
> public Builder withPath(String basePath) {
>   props.setProperty(BASE_PATH_PROP, basePath);
>   return this;
> }
> {code}
> h2. *Solution*
> Modify `open()` as below:
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = 
> getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient 
>   final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes 
> here
>   final HoodieWriteConfig hoodieClientConfig = 
> StreamerUtil.getHoodieClientConfig(conf);
>   final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new 
> FlinkTaskContextSupplier(null));
>   writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to