[
https://issues.apache.org/jira/browse/FLINK-28567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Trushev updated FLINK-28567:
--------------------------------------
Description:
h2. Context
There is JoinPushTransitivePredicatesRule in Calcite that infers predicates
from on a Join and creates Filters if those predicates can be pushed to its
inputs.
*Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)
h2. Proposal
Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to
FlinkStreamRuleSets and to FlinkBatchRuleSets
h2. Benefit
Before the changes:
{code}
Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;
Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A,
filter=[]]], fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- TableSourceScan(table=[[default_catalog, default_database, B]],
fields=[b0])
{code}
After the changes:
{code}
Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A,
filter=[]]], fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[b0], where=[>(b0, 0)])
+- TableSourceScan(table=[[default_catalog, default_database, B,
filter=[]]], fields=[b0])
{code}
i.e., b0 > 0 is inferred and pushed down
was:
h2. Context
There is JoinPushTransitivePredicatesRule in Calcite that infers predicates
from on a Join and creates Filters if those predicates can be pushed to its
inputs.
*Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)
h2. Proposal
Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to
FlinkStreamRuleSets and to FlinkBatchRuleSets
h2. Benefit
Before the changes:
{code}
Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;
Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]],
fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b0])
{code}
After the changes:
{code}
Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a0], where=[>(a0, 0)])
: +- TableSourceScan(table=[[default_catalog, default_database, A, filter=[]]],
fields=[a0])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[b0], where=[>(b0, 0)])
+- TableSourceScan(table=[[default_catalog, default_database, B, filter=[]]],
fields=[b0])
{code}
i.e., b0 > 0 is inferred and pushed down
> Introduce predicate inference from one side of join to the other
> ----------------------------------------------------------------
>
> Key: FLINK-28567
> URL: https://issues.apache.org/jira/browse/FLINK-28567
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Alexander Trushev
> Priority: Major
>
> h2. Context
> There is JoinPushTransitivePredicatesRule in Calcite that infers predicates
> from on a Join and creates Filters if those predicates can be pushed to its
> inputs.
> *Example.* (a0 = b0 AND a0 > 0) => (b0 > 0)
> h2. Proposal
> Add org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule to
> FlinkStreamRuleSets and to FlinkBatchRuleSets
> h2. Benefit
> Before the changes:
> {code}
> Flink SQL> explain select * from A join B on a0 = b0 and a0 > 0;
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> : +- Calc(select=[a0], where=[>(a0, 0)])
> : +- TableSourceScan(table=[[default_catalog, default_database, A,
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
> +- TableSourceScan(table=[[default_catalog, default_database, B]],
> fields=[b0])
> {code}
> After the changes:
> {code}
> Join(joinType=[InnerJoin], where=[=(a0, b0)], select=[a0, b0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a0]])
> : +- Calc(select=[a0], where=[>(a0, 0)])
> : +- TableSourceScan(table=[[default_catalog, default_database, A,
> filter=[]]], fields=[a0])
> +- Exchange(distribution=[hash[b0]])
> +- Calc(select=[b0], where=[>(b0, 0)])
> +- TableSourceScan(table=[[default_catalog, default_database, B,
> filter=[]]], fields=[b0])
> {code}
> i.e., b0 > 0 is inferred and pushed down
--
This message was sent by Atlassian Jira
(v8.20.10#820010)