Tianyi Wang created IMPALA-6213:
-----------------------------------
Summary: The partitioning compatibility check is wrong in
consecutive outer join cases
Key: IMPALA-6213
URL: https://issues.apache.org/jira/browse/IMPALA-6213
Project: IMPALA
Issue Type: Bug
Components: Frontend
Affects Versions: Impala 2.10.0
Reporter: Tianyi Wang
Currently createAnalyticFragment() and createMergeAggregationFragment() uses
child fragment partitioning info and refsNullableTupleId() to determine whether
the child fragment partitioning can be directly adapted without an extra
exchange.
It is wrong because:
# The output partition of an outer join node is always assigned its lhs input
partition, which is not correct for full/right outer joins.
# refsNullableTupleId() seems to be designed to handle the outer join case but
it's not sufficient.
Given the query
{noformat}
select /* +straight_join */ t2.id, count(*)
from functional.alltypes t1
left outer join /* +shuffle */ functional.alltypessmall t2
on t1.int_col = t2.int_col
right outer join /* +shuffle */ functional.alltypestiny t3
on t2.id = t3.id
group by t2.id
{noformat}
impala@3ddafcd29505614a01c8f4362396635c84ab4052 generates the following plan:
{noformat}
+--------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=5.81MB |
| Per-Host Resource Estimates: Memory=205.88MB |
| Codegen disabled by planner |
| |
| PLAN-ROOT SINK |
| | |
| 10:EXCHANGE [UNPARTITIONED] |
| | |
| 05:AGGREGATE [FINALIZE] |
| | output: count(*) |
| | group by: t2.id |
| | |
| 04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] |
| | hash predicates: t2.id = t3.id |
| | runtime filters: RF000 <- t3.id |
| | |
| |--09:EXCHANGE [HASH(t3.id)] |
| | | |
| | 02:SCAN HDFS [functional.alltypestiny t3] |
| | partitions=4/4 files=4 size=460B |
| | |
| 08:EXCHANGE [HASH(t2.id)] |
| | |
| 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] |
| | hash predicates: t1.int_col = t2.int_col |
| | |
| |--07:EXCHANGE [HASH(t2.int_col)] |
| | | |
| | 01:SCAN HDFS [functional.alltypessmall t2] |
| | partitions=4/4 files=4 size=6.32KB |
| | runtime filters: RF000 -> t2.id |
| | |
| 06:EXCHANGE [HASH(t1.int_col)] |
| | |
| 00:SCAN HDFS [functional.alltypes t1] |
| partitions=24/24 files=24 size=478.45KB |
+--------------------------------------------------+
{noformat}, which is wrong because the rows with t2.id=null can appear in any
partition after the outer join. So it's incorrect to aggregate without an
exchange.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)