This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b79c9  [hotfix] Log the error message for creating table source 
first (#2711)
29b79c9 is described below

commit 29b79c99b02d66ef9b087b56223e74c0d1f99e94
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Wed Mar 24 18:25:37 2021 +0800

    [hotfix] Log the error message for creating table source first (#2711)
---
 .../org/apache/hudi/table/HoodieTableFactory.java  | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a2dac36..7ce8880 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
 
@@ -57,14 +58,24 @@ public class HoodieTableFactory implements 
TableSourceFactory<RowData>, TableSin
     Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
     TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getTable(), schema);
-    Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
-        new ValidationException("Option [path] should be not empty.")));
-    return new HoodieTableSource(
-        schema,
-        path,
-        context.getTable().getPartitionKeys(),
-        conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
-        conf);
+    // enclosing the code within a try catch block so that we can log the 
error message.
+    // Flink 1.11 did a bad compatibility for the old table factory, it uses 
the old factory
+    // to create the source/sink and catches all the exceptions then tries the 
new factory.
+    //
+    // log the error message first so that there is a chance to show the real 
failure cause.
+    try {
+      Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() 
->
+          new ValidationException("Option [path] should not be empty.")));
+      return new HoodieTableSource(
+          schema,
+          path,
+          context.getTable().getPartitionKeys(),
+          conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
+          conf);
+    } catch (Throwable throwable) {
+      LOG.error("Create table source error", throwable);
+      throw new HoodieException(throwable);
+    }
   }
 
   @Override

Reply via email to