Aleksey Plekhanov created IGNITE-18341:
------------------------------------------
Summary: Calcite engine. Introduce correlate based distribution
Key: IGNITE-18341
URL: https://issues.apache.org/jira/browse/IGNITE-18341
Project: Ignite
Issue Type: Improvement
Reporter: Aleksey Plekhanov
Assignee: Aleksey Plekhanov
To propagate hash/affinity distribution in relation node all distribution keys
should be contained in the node. It's impossible to pass hash distribution
through the node if node knows nothing about hash distribution keys. For
example, hash distribution can't bypass aggregate if one or more of
distribution keys is not contained in grouped columns.
Suppose, for example, we have two tables T1 and T2 colocated on fields T1.A and
T2.A. The following query:
{code:java}
SELECT (SELECT sum(b) FROM t2 WHERE t2.a = t1.a) FROM t1 {code}
Hash distribution can't be used on the right side of the correlated nested loop
join, since aggregate doesn't have required columns, and plan for such a query
looks very ineffective:
{noformat}
IgniteProject(EXPR$0=[$3]), id = 219
IgniteCorrelatedNestedLoopJoin(condition=[true], joinType=[left],
variablesSet=[[$cor0]], correlationVariables=[[$cor0]]), id = 218
IgniteExchange(distribution=[single]), id = 213
IgniteTableScan(table=[[PUBLIC, T1]]), id = 84
IgniteColocatedHashAggregate(group=[{}], SUM(B)=[SUM($0)]), id = 217
IgniteProject(B=[$1]), id = 216
IgniteHashIndexSpool(readType=[LAZY], writeType=[EAGER],
searchRow=[[$cor0.A, null]], condition=[=($0, $cor0.A)], allowNulls=[false]),
id = 215
IgniteExchange(distribution=[single]), id = 214
IgniteTableScan(table=[[PUBLIC, T2]], requiredColumns=[{0, 1}]), id
= 112{noformat}
If we look closer to the query we can find that filter {{t2.a = t1.a}} makes
this query colocated. If we run such a query on H2 engine with
{{colocatedJoin=false}} flag it will return the correct result.
To workaround such a problem I propose to introduce some kind of artificial
"correlated distribution". This distribution will be produced on the right side
of correlated nested loop join, if left side of the join has hash distribution
(will contain reference to correlate and distribution of this correlate), than
passed through set of nodes without modification and finally on filter node
will be restored as hash distribution remaped to input operator fields (if
filter contains equality conditions input operator fields and correlated
variable fields).
After such a change plan should be looks like:
{noformat}
IgniteExchange(distribution=[single]), id = 283
IgniteProject(EXPR$0=[$3]), id = 282
IgniteCorrelatedNestedLoopJoin(condition=[true], joinType=[left],
variablesSet=[[$cor0]], correlationVariables=[[$cor0]]), id = 281
IgniteTableScan(table=[[PUBLIC, T1]]), id = 84
IgniteColocatedHashAggregate(group=[{}], SUM(B)=[SUM($0)]), id = 280
IgniteProject(B=[$1]), id = 279
IgniteFilter(condition=[=($0, $cor0.A)]), id = 278
IgniteTableScan(table=[[PUBLIC, T2]], requiredColumns=[{0, 1}]), id
= 112
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)