This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 02160c002 [bug][flink] Calculate splits with predicate in 
DataTableSource (#821)
02160c002 is described below

commit 02160c00256cbc98427050dac2583efc75d7876f
Author: Shammon FY <[email protected]>
AuthorDate: Tue Apr 4 16:11:12 2023 +0800

    [bug][flink] Calculate splits with predicate in DataTableSource (#821)
---
 .../java/org/apache/paimon/flink/source/DataTableSource.java |  8 +-------
 .../java/org/apache/paimon/flink/ReadWriteTableITCase.java   | 12 ++++++++++++
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index f0de0825a..e4b00e7cf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -34,10 +34,8 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -220,11 +218,7 @@ public class DataTableSource extends FlinkTableSource
             if (streaming) {
                 parallelism = options.get(CoreOptions.BUCKET);
             } else {
-
-                Preconditions.checkState(table instanceof DataTable);
-                DataTable dataTable = (DataTable) table;
-                splits = dataTable.newScan().plan().splits();
-
+                splits = 
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
                 if (null != splits) {
                     parallelism = splits.size();
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 46a674c5f..5c1699e9c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1218,6 +1218,18 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                             }
                                         })))
                 .isEqualTo(2);
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "WHERE currency='Euro'",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                            }
+                                        })))
+                .isEqualTo(1);
 
         // 2 splits and limit is 1, the parallelism is the limit value : 1
         assertThat(

Reply via email to