Alexander Trushev created FLINK-28530:
-----------------------------------------

             Summary: Improvement of conditions extraction that can be pushed 
into join inputs
                 Key: FLINK-28530
                 URL: https://issues.apache.org/jira/browse/FLINK-28530
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Alexander Trushev


Conditions extraction in batch mode was introduced here FLINK-12509 and in 
stream mode here FLINK-24139
h2. Proposal

This ticket is aimed at replacing current extraction algorithm with new one 
which covers more complex case with deep nested predicate:
for all n > 0
((((((((a0 and b0) or a1) and b1) or a2) and b2) or a3) ... and bn-1) or an) => 
(a0 or a1 or ... or an)

*Example.* For n = 3 Flink does not extract (a0 or a1 or a2 or a3):
{code:java}
FlinkSQL> explain select * from A join B on (((((a0=0 and b0=0) or a1=0) and 
b1=0) or a2=0) and b2=0) or a3=0;

== Optimized Physical Plan ==
Join(joinType=[InnerJoin], where=[OR(AND(OR(AND(OR(AND(=(a0, 0), =(b0, 0)), 
=(a1, 0)), =(b1, 0)), =(a2, 0)), =(b2, 0)), =(a3, 0))], select=[a0, a1, a2, a3, 
a4, b0, b1, b2, b3, b4], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, A]], 
fields=[a0, a1, a2, a3, a4])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, B]], 
fields=[b0, b1, b2, b3, b4])
{code}
while PostgreSQL does:
{code:java}
postgres=# explain select * from A join B on ((((((a0=0 and b0=0) or a1=0) and 
b1=0) or a2=0) and b2=0) or a3=0);
                                                          QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------
 Nested Loop  (cost=0.00..1805.09 rows=14632 width=40)
   Join Filter: (((((((a.a0 = 0) AND (b.b0 = 0)) OR (a.a1 = 0)) AND (b.b1 = 0)) 
OR (a.a2 = 0)) AND (b.b2 = 0)) OR (a.a3 = 0))
   ->  Seq Scan on b  (cost=0.00..27.00 rows=1700 width=20)
   ->  Materialize  (cost=0.00..44.17 rows=34 width=20)
         ->  Seq Scan on a  (cost=0.00..44.00 rows=34 width=20)
               Filter: ((a0 = 0) OR (a1 = 0) OR (a2 = 0) OR (a3 = 0))
{code}
h2. Details

Pseudocode of new algorithm:

f – predicate
rel – table
var(rel) – columns
{code:java}
extract(f, rel)
  if f = AND(left, right)
    return AND(extract(left, rel), extract(left, rel))
  if f = OR(left, right)
    return OR(extract(left, rel), extract(left, rel))
  if var(f) subsetOf var(rel)
    return f
  return True

AND(f, True) = AND(True, f) = f
OR(f, True) = OR(True, f) = True
{code}
This algorithm covers deep nested predicates and does not use CNF which 
increases length of predicate to O(n * e^n) in the worst case.

The same recursive approache is used in [PostgreSQL 
orclauses.c|https://github.com/postgres/postgres/blob/164d174bbf9a3aba719c845497863cd3c49a3ad0/src/backend/optimizer/util/orclauses.c#L151-L252]
 and [Apache Spark 
predicates.scala|https://github.com/apache/spark/blob/v3.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L227-L272]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to