This is an automated email from the ASF dual-hosted git repository. garyli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 29b79c9 [hotfix] Log the error message for creating table source first (#2711) 29b79c9 is described below commit 29b79c99b02d66ef9b087b56223e74c0d1f99e94 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Wed Mar 24 18:25:37 2021 +0800 [hotfix] Log the error message for creating table source first (#2711) --- .../org/apache/hudi/table/HoodieTableFactory.java | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a2dac36..7ce8880 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; @@ -57,14 +58,24 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); - Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> - new ValidationException("Option [path] should be not empty."))); - return new HoodieTableSource( - schema, - path, - context.getTable().getPartitionKeys(), - conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), - conf); + // enclosing the code within a try catch block so that we can log the error message. + // Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory + // to create the source/sink and catches all the exceptions then tries the new factory. + // + // log the error message first so that there is a chance to show the real failure cause. + try { + Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> + new ValidationException("Option [path] should not be empty."))); + return new HoodieTableSource( + schema, + path, + context.getTable().getPartitionKeys(), + conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), + conf); + } catch (Throwable throwable) { + LOG.error("Create table source error", throwable); + throw new HoodieException(throwable); + } } @Override