This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 02160c002 [bug][flink] Calculate splits with predicate in
DataTableSource (#821)
02160c002 is described below
commit 02160c00256cbc98427050dac2583efc75d7876f
Author: Shammon FY <[email protected]>
AuthorDate: Tue Apr 4 16:11:12 2023 +0800
[bug][flink] Calculate splits with predicate in DataTableSource (#821)
---
.../java/org/apache/paimon/flink/source/DataTableSource.java | 8 +-------
.../java/org/apache/paimon/flink/ReadWriteTableITCase.java | 12 ++++++++++++
2 files changed, 13 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index f0de0825a..e4b00e7cf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -34,10 +34,8 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -220,11 +218,7 @@ public class DataTableSource extends FlinkTableSource
if (streaming) {
parallelism = options.get(CoreOptions.BUCKET);
} else {
-
- Preconditions.checkState(table instanceof DataTable);
- DataTable dataTable = (DataTable) table;
- splits = dataTable.newScan().plan().splits();
-
+ splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
if (null != splits) {
parallelism = splits.size();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 46a674c5f..5c1699e9c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1218,6 +1218,18 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
}
})))
.isEqualTo(2);
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "WHERE currency='Euro'",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ }
+ })))
+ .isEqualTo(1);
// 2 splits and limit is 1, the parallelism is the limit value : 1
assertThat(