张超明 created HUDI-1596: ------------------------- Summary: NPE while reading configuration of parameter 'path'. Key: HUDI-1596 URL: https://issues.apache.org/jira/browse/HUDI-1596 Project: Apache Hudi Issue Type: Bug Components: Flink Integration Affects Versions: 0.8.0 Reporter: 张超明
h2. *Problem* Program throws NPE during initilaize writeClient. {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg)); } {code} The `cfg` does not have option 'path', which cause NPE. Exception occured on ` .withPath(conf.getString(FlinkOptions.PATH))`. {code:java} public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) .build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); builder = builder.withSchema(getSourceSchema(conf).toString()); return builder.build(); }{code} h2. *Solution* Modify `open()` as below: {code:java} public void open(Configuration parameters) throws Exception { super.open(parameters); // Get configs from runtimeContext cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); // writeClient final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf); final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)); writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)