[ 
https://issues.apache.org/jira/browse/FLINK-32639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dove updated FLINK-32639:
-------------------------
    Affects Version/s: 1.18.0

> Filter and Limit exist at the same time, limit cannot take effect
> -----------------------------------------------------------------
>
>                 Key: FLINK-32639
>                 URL: https://issues.apache.org/jira/browse/FLINK-32639
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Dove
>            Priority: Minor
>
> Define the environment using Flink Batch.
> The Source connector uses filesystem(FileSystemTableSource implements 
> SupportsLimitPushDown/SupportsFilterPushDown)
>  
> {code:java}
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
> tabEnv.executeSql(
>         "CREATE TABLE source(uuid varchar, name varchar, age int, ts 
> timestamp,`partition` varchar)  "
>                 + "WITH ( 'connector' = 'filesystem', 
> 'path'='file:///tmp/file', 'format'='csv' "
>                 + ")");{code}
> Case 1: Filter
>  
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null").print();
> == Optimized Execution Plan ==
> Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition], 
> where=[name IS NULL])
> +- TableSourceScan(table=[[default_catalog, default_database, source, 
> filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code}
>  
> Case 2: Limit 
> {code:java}
> tabEnv.executeSql("explain select * from source limit 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
>    +- Limit(offset=[0], fetch=[10], global=[false])
>       +- TableSourceScan(table=[[default_catalog, default_database, source, 
> limit=[10]]], fields=[uuid, name, age, ts, partition]) {code}
> Case 3: Filter + Limit
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null limit 
> 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
>    +- Limit(offset=[0], fetch=[10], global=[false])
>       +- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, 
> partition], where=[name IS NULL])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> source, filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]) 
> {code}
> When the Filter condition is in effect, Limit does not appear to be able to 
> be pushed down to Source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to