This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 50f7b7356ec [HUDI-6434] Fix IllegalArgumentException when do 
read_optimized read (#9048)
50f7b7356ec is described below

commit 50f7b7356ec372249be4fbca869a8c06dd74c139
Author: flashJd <[email protected]>
AuthorDate: Mon Jun 26 17:07:24 2023 +0800

    [HUDI-6434] Fix IllegalArgumentException when do read_optimized read (#9048)
---
 .../java/org/apache/hudi/table/HoodieTableSource.java   |  4 ++++
 .../org/apache/hudi/table/ITTestHoodieDataSource.java   | 17 +++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index de023824fbd..d1a3df771a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -515,6 +515,10 @@ public class HoodieTableSource implements
         .map(HoodieBaseFile::getFileStatus)
         .map(FileStatus::getPath).toArray(Path[]::new);
 
+    if (paths.length == 0) {
+      return InputFormats.EMPTY_INPUT_FORMAT;
+    }
+
     return new CopyOnWriteInputFormat(
         FilePathUtils.toFlinkPaths(paths),
         this.schema.getColumnNames().toArray(new String[0]),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 80a78a81af3..5fbf44062a0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -304,6 +304,23 @@ public class ITTestHoodieDataSource {
     }
   }
 
+  @Test
+  void testStreamWriteBatchReadOptimizedWithoutCompaction() {
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .option(FlinkOptions.QUERY_TYPE, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    final String insertInto = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+    execInsertSql(streamTableEnv, insertInto);
+
+    List<Row> rows = CollectionUtil.iterableToList(
+        () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+    assertTrue(rows.isEmpty());
+  }
+
   @Test
   void testStreamWriteReadSkippingCompaction() throws Exception {
     // create filesystem table named source

Reply via email to