Dove created FLINK-32639:
----------------------------
Summary: Filter and Limit exist at the same time, limit cannot
take effect
Key: FLINK-32639
URL: https://issues.apache.org/jira/browse/FLINK-32639
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Dove
Define the environment using Flink Batch.
The Source connector uses filesystem(FileSystemTableSource implements
SupportsLimitPushDown/SupportsFilterPushDown)
{code:java}
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
tabEnv.executeSql(
"CREATE TABLE source(uuid varchar, name varchar, age int, ts
timestamp,`partition` varchar) "
+ "WITH ( 'connector' = 'filesystem',
'path'='file:///tmp/file', 'format'='csv' "
+ ")");{code}
Case 1: Filter
{code:java}
tabEnv.executeSql("explain select * from source where name is null").print();
== Optimized Execution Plan ==
Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition],
where=[name IS NULL])
+- TableSourceScan(table=[[default_catalog, default_database, source,
filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code}
Case 2: Limit
{code:java}
tabEnv.executeSql("explain select * from source limit 10").print();
== Optimized Execution Plan ==
Limit(offset=[0], fetch=[10], global=[true])
+- Exchange(distribution=[single])
+- Limit(offset=[0], fetch=[10], global=[false])
+- TableSourceScan(table=[[default_catalog, default_database, source,
limit=[10]]], fields=[uuid, name, age, ts, partition]) {code}
Case 3: Filter + Limit
{code:java}
tabEnv.executeSql("explain select * from source where name is null limit
10").print();
== Optimized Execution Plan ==
Limit(offset=[0], fetch=[10], global=[true])
+- Exchange(distribution=[single])
+- Limit(offset=[0], fetch=[10], global=[false])
+- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts,
partition], where=[name IS NULL])
+- TableSourceScan(table=[[default_catalog, default_database, source,
filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]) {code}
When the Filter condition is in effect, Limit does not appear to be able to be
pushed down to Source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)