[
https://issues.apache.org/jira/browse/FLINK-32639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dove updated FLINK-32639:
-------------------------
Affects Version/s: 1.18.0
> Filter and Limit exist at the same time, limit cannot take effect
> -----------------------------------------------------------------
>
> Key: FLINK-32639
> URL: https://issues.apache.org/jira/browse/FLINK-32639
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: Dove
> Priority: Minor
>
> Define the environment using Flink Batch.
> The Source connector uses filesystem(FileSystemTableSource implements
> SupportsLimitPushDown/SupportsFilterPushDown)
>
> {code:java}
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
> tabEnv.executeSql(
> "CREATE TABLE source(uuid varchar, name varchar, age int, ts
> timestamp,`partition` varchar) "
> + "WITH ( 'connector' = 'filesystem',
> 'path'='file:///tmp/file', 'format'='csv' "
> + ")");{code}
> Case 1: Filter
>
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null").print();
> == Optimized Execution Plan ==
> Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition],
> where=[name IS NULL])
> +- TableSourceScan(table=[[default_catalog, default_database, source,
> filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code}
>
> Case 2: Limit
> {code:java}
> tabEnv.executeSql("explain select * from source limit 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
> +- Limit(offset=[0], fetch=[10], global=[false])
> +- TableSourceScan(table=[[default_catalog, default_database, source,
> limit=[10]]], fields=[uuid, name, age, ts, partition]) {code}
> Case 3: Filter + Limit
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null limit
> 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
> +- Limit(offset=[0], fetch=[10], global=[false])
> +- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts,
> partition], where=[name IS NULL])
> +- TableSourceScan(table=[[default_catalog, default_database,
> source, filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition])
> {code}
> When the Filter condition is in effect, Limit does not appear to be able to
> be pushed down to Source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)