This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 50f7b7356ec [HUDI-6434] Fix IllegalArgumentException when do
read_optimized read (#9048)
50f7b7356ec is described below
commit 50f7b7356ec372249be4fbca869a8c06dd74c139
Author: flashJd <[email protected]>
AuthorDate: Mon Jun 26 17:07:24 2023 +0800
[HUDI-6434] Fix IllegalArgumentException when do read_optimized read (#9048)
---
.../java/org/apache/hudi/table/HoodieTableSource.java | 4 ++++
.../org/apache/hudi/table/ITTestHoodieDataSource.java | 17 +++++++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index de023824fbd..d1a3df771a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -515,6 +515,10 @@ public class HoodieTableSource implements
.map(HoodieBaseFile::getFileStatus)
.map(FileStatus::getPath).toArray(Path[]::new);
+ if (paths.length == 0) {
+ return InputFormats.EMPTY_INPUT_FORMAT;
+ }
+
return new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 80a78a81af3..5fbf44062a0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -304,6 +304,23 @@ public class ITTestHoodieDataSource {
}
}
+ @Test
+ void testStreamWriteBatchReadOptimizedWithoutCompaction() {
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+ .option(FlinkOptions.QUERY_TYPE,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ final String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+ execInsertSql(streamTableEnv, insertInto);
+
+ List<Row> rows = CollectionUtil.iterableToList(
+ () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ assertTrue(rows.isEmpty());
+ }
+
@Test
void testStreamWriteReadSkippingCompaction() throws Exception {
// create filesystem table named source