[ 
https://issues.apache.org/jira/browse/FLINK-28567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28567:
-----------------------------------
    Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Introduce predicate inference from one side of join to the other
> ----------------------------------------------------------------
>
>                 Key: FLINK-28567
>                 URL: https://issues.apache.org/jira/browse/FLINK-28567
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Alexander Trushev
>            Assignee: Alexander Trushev
>            Priority: Major
>              Labels: pull-request-available, stale-assigned
>
> h2. Context
> There is JoinPushTransitivePredicatesRule in Calcite that infers predicates 
> from on a Join and creates Filters if those predicates can be pushed to its 
> inputs.
> *Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)
> h2. Proposal
> Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to 
> FlinkStreamRuleSets and to FlinkBatchRuleSets
> h2. Benefit
> Before the changes:
> {code}
> Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> :  +- Calc(select=[a0], where=[>(a0, 0)])
> :     +- TableSourceScan(table=[[default_catalog, default_database, A, 
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
>    +- TableSourceScan(table=[[default_catalog, default_database, B]], 
> fields=[b0])
> {code}
> After the changes:
> {code}
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> :  +- Calc(select=[a0], where=[>(a0, 0)])
> :     +- TableSourceScan(table=[[default_catalog, default_database, A, 
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
>    +- Calc(select=[b0], where=[>(b0, 0)])
>       +- TableSourceScan(table=[[default_catalog, default_database, B, 
> filter=[]]], fields=[b0])
> {code}
> i.e., b0 > 0 is inferred and pushed down



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to