[ 
https://issues.apache.org/jira/browse/FLINK-28530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28530:
-----------------------------------
    Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Improvement of extraction of conditions that can be pushed into join inputs
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-28530
>                 URL: https://issues.apache.org/jira/browse/FLINK-28530
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Alexander Trushev
>            Priority: Major
>              Labels: pull-request-available, stale-major
>
> Conditions extraction in batch mode was introduced here FLINK-12509 and in 
> stream mode here FLINK-24139
> h2. Proposal
> This ticket is aimed at replacing current extraction algorithm with new one 
> which covers more complex case with deep nested predicate:
> for all n > 0
> ((((((((a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) 
> => (a0 or a1 or ... or an)
> *Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3):
> {code:java}
> FlinkSQL> explain select * from A join B on (((((a0=0 and b0=0) or a1=0) and 
> b1=0) or a2=0) and b2=0) or a3=0;
> == Optimized Physical Plan ==
> Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), 
> =(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, 
> a3, a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[single])
> :  +- TableSourceScan(table=[[default_catalog, default_database, A]], 
> fields=[a0, a1, a2, a3, a4])
> +- Exchange(distribution=[single])
>    +- TableSourceScan(table=[[default_catalog, default_database, B]], 
> fields=[b0, b1, b2, b3, b4])
> {code}
> while PostgreSQL does:
> {code:java}
> postgres=# explain select * from A join B on ((((((a0=0 and b0=0) or a1=0) 
> and b1=0) or a2=0) and b2=0) or a3=0);
>                                                           QUERY PLAN
> ------------------------------------------------------------------------------------------------------------------------------
>  Nested Loop  (cost=0.00..1805.09 rows=14632 width=40)
>    Join Filter: (((((((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 
> 0)) OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0))
>    ->  Seq Scan on b  (cost=0.00..27.00 rows=1700 width=20)
>    ->  Materialize  (cost=0.00..44.17 rows=34 width=20)
>          ->  Seq Scan on a  (cost=0.00..44.00 rows=34 width=20)
>                Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0))
> {code}
> h2. Details
> Pseudocode of new algorithm:
> f – predicate
> rel – table
> var(rel) – columns
> {code:java}
> extract(f, rel)
>   if f = AND(left, right)
>     return AND(extract(left, rel), extract(left, rel))
>   if f = OR(left, right)
>     return OR(extract(left, rel), extract(left, rel))
>   if var(f) subsetOf var(rel)
>     return f
>   return True
> AND(f, True) = AND(True, f) = f
> OR(f, True) = OR(True, f) = True
> {code}
> This algorithm covers deep nested predicates and does not use CNF which 
> increases length of predicate to O(n * e^n) in the worst case.
> The same recursive approach is used in PostgreSQL 
> [orclauses.c|https://github.com/postgres/postgres/blob/REL_14_4/src/backend/optimizer/util/orclauses.c#L151-L252]
>  and Apache Spark 
> [predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to