[ 
https://issues.apache.org/jira/browse/FLINK-28567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-28567:
--------------------------------------

    Assignee: Alexander Trushev

> Introduce predicate inference from one side of join to the other
> ----------------------------------------------------------------
>
>                 Key: FLINK-28567
>                 URL: https://issues.apache.org/jira/browse/FLINK-28567
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Alexander Trushev
>            Assignee: Alexander Trushev
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Context
> There is JoinPushTransitivePredicatesRule in Calcite that infers predicates 
> from on a Join and creates Filters if those predicates can be pushed to its 
> inputs.
> *Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)
> h2. Proposal
> Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to 
> FlinkStreamRuleSets and to FlinkBatchRuleSets
> h2. Benefit
> Before the changes:
> {code}
> Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> :  +- Calc(select=[a0], where=[>(a0, 0)])
> :     +- TableSourceScan(table=[[default_catalog, default_database, A, 
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
>    +- TableSourceScan(table=[[default_catalog, default_database, B]], 
> fields=[b0])
> {code}
> After the changes:
> {code}
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> :  +- Calc(select=[a0], where=[>(a0, 0)])
> :     +- TableSourceScan(table=[[default_catalog, default_database, A, 
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
>    +- Calc(select=[b0], where=[>(b0, 0)])
>       +- TableSourceScan(table=[[default_catalog, default_database, B, 
> filter=[]]], fields=[b0])
> {code}
> i.e., b0 > 0 is inferred and pushed down



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to