[ https://issues.apache.org/jira/browse/HUDI-1596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
张超明 updated HUDI-1596: ---------------------- Description: h2. *Problem* Program throws NPE during initilaize writeClient. {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg)); } {code} The `cfg` does not have option 'PATH', which causes NPE. Exception occured on `.withPath(conf.getString(FlinkOptions.PATH))`. {code:java} public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) .build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); builder = builder.withSchema(getSourceSchema(conf).toString()); return builder.build(); }{code} On the following step, the `basePath` is `null`. More importantly, the Properties(`props`) extends HashTable, which will throw NPE when recieve a `null` value. {code:java} public Builder withPath(String basePath) { props.setProperty(BASE_PATH_PROP, basePath); return this; } {code} h2. *Solution* Modify `open()` as below: {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes here final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf); final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)); writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig); } {code} was: h2. *Problem* Program throws NPE during initilaize writeClient. {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg)); } {code} The `cfg` does not have option 'PATH', which causes NPE. Exception occured on `.withPath(conf.getString(FlinkOptions.PATH))`. {code:java} public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) .build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); builder = builder.withSchema(getSourceSchema(conf).toString()); return builder.build(); }{code} On the following step, the basePath is `null`. More importantly, the Properties(`props`) extends HashTable, which will throw NPE when recieve a `null` value. {code:java} public Builder withPath(String basePath) { props.setProperty(BASE_PATH_PROP, basePath); return this; } {code} h2. *Solution* Modify `open()` as below: {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes here final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf); final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)); writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig); } {code} > Throw NPE while reading configuration of parameter 'PATH'. > ---------------------------------------------------------- > > Key: HUDI-1596 > URL: https://issues.apache.org/jira/browse/HUDI-1596 > Project: Apache Hudi > Issue Type: Bug > Components: Flink Integration > Affects Versions: 0.8.0 > Reporter: 张超明 > Priority: Blocker > Labels: pull-request-available > > h2. *Problem* > Program throws NPE during initilaize writeClient. > {code:java} > public void open(Configuration parameters) throws Exception { > super.open(parameters); > // Get configs from runtimeContext > cfg = (FlinkStreamerConfig) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > writeParallelSize = > getRuntimeContext().getExecutionConfig().getParallelism(); > // writeClient > writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new > FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg)); > } > {code} > The `cfg` does not have option 'PATH', which causes NPE. Exception occured on > `.withPath(conf.getString(FlinkOptions.PATH))`. > {code:java} > public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { > HoodieWriteConfig.Builder builder = > HoodieWriteConfig.newBuilder() > .withEngineType(EngineType.FLINK) > .withPath(conf.getString(FlinkOptions.PATH)) > .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) > .withCompactionConfig( > HoodieCompactionConfig.newBuilder() > > .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) > .build()) > .forTable(conf.getString(FlinkOptions.TABLE_NAME)) > .withAutoCommit(false) > > .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); > builder = builder.withSchema(getSourceSchema(conf).toString()); > return builder.build(); > }{code} > On the following step, the `basePath` is `null`. More importantly, the > Properties(`props`) extends HashTable, which will throw NPE when recieve a > `null` value. > {code:java} > public Builder withPath(String basePath) { > props.setProperty(BASE_PATH_PROP, basePath); > return this; > } > {code} > h2. *Solution* > Modify `open()` as below: > {code:java} > public void open(Configuration parameters) throws Exception { > super.open(parameters); > // Get configs from runtimeContext > cfg = (FlinkStreamerConfig) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > writeParallelSize = > getRuntimeContext().getExecutionConfig().getParallelism(); > // writeClient > final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes > here > final HoodieWriteConfig hoodieClientConfig = > StreamerUtil.getHoodieClientConfig(conf); > final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new > FlinkTaskContextSupplier(null)); > writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)