This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c5220b9 [HUDI-1781] Fix Flink streaming reader throws
ClassCastException (#2900)
c5220b9 is described below
commit c5220b96e94d13530f14cd4ea9ba04cd1e559474
Author: dijie <[email protected]>
AuthorDate: Sat May 1 19:13:15 2021 +0800
[HUDI-1781] Fix Flink streaming reader throws ClassCastException (#2900)
---
.../org/apache/hudi/table/HoodieTableSource.java | 6 +++++-
.../apache/hudi/table/HoodieDataSourceITCase.java | 23 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index bc53c92..50420f9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -174,7 +174,11 @@ public class HoodieTableSource implements
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), metaClient,
maxCompactionMemoryInBytes);
- OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat)
getInputFormat(true));
+ InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+ if (!(inputFormat instanceof MergeOnReadInputFormat)) {
+ throw new HoodieException("No successful commits under path " +
path);
+ }
+ OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
.uid("uid_streaming_source")
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 3d413a7..69627f2 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -390,6 +392,27 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
}
+ @ParameterizedTest
+ @EnumSource(value = ExecMode.class)
+ void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
+ // create an empty table
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ StreamerUtil.initTableIfNotExists(conf);
+
+ // create a flink source table
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+ options.put(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ String createHoodieTable =
TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ streamTableEnv.executeSql(createHoodieTable);
+
+ // execute query and assert throws exception
+ assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv,
"select * from t1", 10),
+ "No successful commits under path " + tempFile.getAbsolutePath());
+
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------