Tianyi Wang created IMPALA-6213:
-----------------------------------

             Summary: The partitioning compatibility check is wrong in 
consecutive outer join cases
                 Key: IMPALA-6213
                 URL: https://issues.apache.org/jira/browse/IMPALA-6213
             Project: IMPALA
          Issue Type: Bug
          Components: Frontend
    Affects Versions: Impala 2.10.0
            Reporter: Tianyi Wang


Currently createAnalyticFragment() and createMergeAggregationFragment() uses 
child fragment partitioning info and refsNullableTupleId() to determine whether 
the child fragment partitioning can be directly adapted without an extra 
exchange.
It is wrong because:
# The output partition of an outer join node is always assigned its lhs input 
partition, which is not correct for full/right outer joins.
# refsNullableTupleId() seems to be designed to handle the outer join case but 
it's not sufficient.
Given the query
{noformat}
select /* +straight_join */ t2.id, count(*)
from functional.alltypes t1
left outer join /* +shuffle */ functional.alltypessmall t2
  on t1.int_col = t2.int_col
right outer join /* +shuffle */ functional.alltypestiny t3
  on t2.id = t3.id
group by t2.id
{noformat}
impala@3ddafcd29505614a01c8f4362396635c84ab4052 generates the following plan:
{noformat}
+--------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=5.81MB |
| Per-Host Resource Estimates: Memory=205.88MB     |
| Codegen disabled by planner                      |
|                                                  |
| PLAN-ROOT SINK                                   |
| |                                                |
| 10:EXCHANGE [UNPARTITIONED]                      |
| |                                                |
| 05:AGGREGATE [FINALIZE]                          |
| |  output: count(*)                              |
| |  group by: t2.id                               |
| |                                                |
| 04:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]     |
| |  hash predicates: t2.id = t3.id                |
| |  runtime filters: RF000 <- t3.id               |
| |                                                |
| |--09:EXCHANGE [HASH(t3.id)]                     |
| |  |                                             |
| |  02:SCAN HDFS [functional.alltypestiny t3]     |
| |     partitions=4/4 files=4 size=460B           |
| |                                                |
| 08:EXCHANGE [HASH(t2.id)]                        |
| |                                                |
| 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]      |
| |  hash predicates: t1.int_col = t2.int_col      |
| |                                                |
| |--07:EXCHANGE [HASH(t2.int_col)]                |
| |  |                                             |
| |  01:SCAN HDFS [functional.alltypessmall t2]    |
| |     partitions=4/4 files=4 size=6.32KB         |
| |     runtime filters: RF000 -> t2.id            |
| |                                                |
| 06:EXCHANGE [HASH(t1.int_col)]                   |
| |                                                |
| 00:SCAN HDFS [functional.alltypes t1]            |
|    partitions=24/24 files=24 size=478.45KB       |
+--------------------------------------------------+
{noformat}, which is wrong because the rows with t2.id=null can appear in any 
partition after the outer join. So it's incorrect to aggregate without an 
exchange.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to