This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c5220b9  [HUDI-1781] Fix Flink streaming reader throws 
ClassCastException (#2900)
c5220b9 is described below

commit c5220b96e94d13530f14cd4ea9ba04cd1e559474
Author: dijie <[email protected]>
AuthorDate: Sat May 1 19:13:15 2021 +0800

    [HUDI-1781] Fix Flink streaming reader throws ClassCastException (#2900)
---
 .../org/apache/hudi/table/HoodieTableSource.java   |  6 +++++-
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 23 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index bc53c92..50420f9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -174,7 +174,11 @@ public class HoodieTableSource implements
         if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
           StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
               conf, FilePathUtils.toFlinkPath(path), metaClient, 
maxCompactionMemoryInBytes);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) 
getInputFormat(true));
+          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+          if (!(inputFormat instanceof MergeOnReadInputFormat)) {
+            throw new HoodieException("No successful commits under path " + 
path);
+          }
+          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "streaming_source")
               .setParallelism(1)
               .uid("uid_streaming_source")
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 3d413a7..69627f2 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -390,6 +392,27 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = ExecMode.class)
+  void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
+    // create an empty table
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamerUtil.initTableIfNotExists(conf);
+
+    // create a flink source table
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    String createHoodieTable = 
TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    streamTableEnv.executeSql(createHoodieTable);
+
+    // execute query and assert throws exception
+    assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, 
"select * from t1", 10),
+            "No successful commits under path " + tempFile.getAbsolutePath());
+
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to