This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 29b79c9 [hotfix] Log the error message for creating table source
first (#2711)
29b79c9 is described below
commit 29b79c99b02d66ef9b087b56223e74c0d1f99e94
Author: Danny Chan <[email protected]>
AuthorDate: Wed Mar 24 18:25:37 2021 +0800
[hotfix] Log the error message for creating table source first (#2711)
---
.../org/apache/hudi/table/HoodieTableFactory.java | 27 +++++++++++++++-------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a2dac36..7ce8880 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -57,14 +58,24 @@ public class HoodieTableFactory implements
TableSourceFactory<RowData>, TableSin
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getTable(), schema);
- Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
- new ValidationException("Option [path] should be not empty.")));
- return new HoodieTableSource(
- schema,
- path,
- context.getTable().getPartitionKeys(),
- conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
- conf);
+ // enclosing the code within a try catch block so that we can log the
error message.
+ // Flink 1.11 did a bad compatibility for the old table factory, it uses
the old factory
+ // to create the source/sink and catches all the exceptions then tries the
new factory.
+ //
+ // log the error message first so that there is a chance to show the real
failure cause.
+ try {
+ Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(()
->
+ new ValidationException("Option [path] should not be empty.")));
+ return new HoodieTableSource(
+ schema,
+ path,
+ context.getTable().getPartitionKeys(),
+ conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
+ conf);
+ } catch (Throwable throwable) {
+ LOG.error("Create table source error", throwable);
+ throw new HoodieException(throwable);
+ }
}
@Override