Aleksey Plekhanov created IGNITE-18341:
------------------------------------------

             Summary: Calcite engine. Introduce correlate based distribution
                 Key: IGNITE-18341
                 URL: https://issues.apache.org/jira/browse/IGNITE-18341
             Project: Ignite
          Issue Type: Improvement
            Reporter: Aleksey Plekhanov
            Assignee: Aleksey Plekhanov


To propagate hash/affinity distribution in relation node all distribution keys 
should be contained in the node. It's impossible to pass hash distribution 
through the node if node knows nothing about hash distribution keys. For 
example, hash distribution can't bypass aggregate if one or more of 
distribution keys is not contained in grouped columns.

Suppose, for example, we have two tables T1 and T2 colocated on fields T1.A and 
T2.A. The following query:

 
{code:java}
SELECT (SELECT sum(b) FROM t2 WHERE t2.a = t1.a) FROM t1 {code}
Hash distribution can't be used on the right side of the correlated nested loop 
join, since aggregate doesn't have required columns, and plan for such a query 
looks very ineffective:

 

 
{noformat}
IgniteProject(EXPR$0=[$3]), id = 219
  IgniteCorrelatedNestedLoopJoin(condition=[true], joinType=[left], 
variablesSet=[[$cor0]], correlationVariables=[[$cor0]]), id = 218
    IgniteExchange(distribution=[single]), id = 213
      IgniteTableScan(table=[[PUBLIC, T1]]), id = 84
    IgniteColocatedHashAggregate(group=[{}], SUM(B)=[SUM($0)]), id = 217
      IgniteProject(B=[$1]), id = 216
        IgniteHashIndexSpool(readType=[LAZY], writeType=[EAGER], 
searchRow=[[$cor0.A, null]], condition=[=($0, $cor0.A)], allowNulls=[false]), 
id = 215
          IgniteExchange(distribution=[single]), id = 214
            IgniteTableScan(table=[[PUBLIC, T2]], requiredColumns=[{0, 1}]), id 
= 112{noformat}
If we look closer to the query we can find that filter {{t2.a = t1.a}} makes 
this query colocated. If we run such a query on H2 engine with 
{{colocatedJoin=false}} flag it will return the correct result. 

To workaround such a problem I propose to introduce some kind of artificial 
"correlated distribution". This distribution will be produced on the right side 
of correlated nested loop join, if left side of the join has hash distribution 
(will contain reference to correlate and distribution of this correlate), than 
passed through set of nodes without modification and finally on filter node 
will be restored as hash distribution remaped to input operator fields (if 
filter contains equality conditions input operator fields and correlated 
variable fields).

After such a change plan should be looks like:
{noformat}
IgniteExchange(distribution=[single]), id = 283
  IgniteProject(EXPR$0=[$3]), id = 282
    IgniteCorrelatedNestedLoopJoin(condition=[true], joinType=[left], 
variablesSet=[[$cor0]], correlationVariables=[[$cor0]]), id = 281
      IgniteTableScan(table=[[PUBLIC, T1]]), id = 84
      IgniteColocatedHashAggregate(group=[{}], SUM(B)=[SUM($0)]), id = 280
        IgniteProject(B=[$1]), id = 279
          IgniteFilter(condition=[=($0, $cor0.A)]), id = 278
            IgniteTableScan(table=[[PUBLIC, T2]], requiredColumns=[{0, 1}]), id 
= 112
{noformat}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to