[ 
https://issues.apache.org/jira/browse/IMPALA-5850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Behm resolved IMPALA-5850.
------------------------------------
       Resolution: Fixed
    Fix Version/s: Impala 2.11.0

commit 8c8a1a65e02ed25f22d87adcb42069b269f2d6c1
Author: Alex Behm <[email protected]>
Date:   Mon Aug 28 19:01:39 2017 -0700

    IMPALA-5850: Cast sender partition exprs under unions.
    
    For a series of partitioned joins within the same fragment we must
    cast the sender partition exprs of exchanges to compatible types.
    Otherwise, the hashes generated for identical partition values may
    differ among senders leading to wrong results.
    
    The bug was that this casting process was only performed for
    fragments that are hash-partitioned. However, a union produces a
    fragment with RANDOM partition, but the union could still contain
    partitioned joins whose senders need to be cast appropriately. The
    fix is to add casts regardless of the fragment's data partition.
    
    Testing:
    - Core/hdfs run passed
    - Added a new regresion test
    
    Change-Id: I0aa801bcad8c2324d848349c7967d949224404e0
    Reviewed-on: http://gerrit.cloudera.org:8080/7884
    Reviewed-by: Alex Behm <[email protected]>
    Tested-by: Impala Public Jenkins


> Partitioned hash join inside union may return wrong results
> -----------------------------------------------------------
>
>                 Key: IMPALA-5850
>                 URL: https://issues.apache.org/jira/browse/IMPALA-5850
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Frontend
>    Affects Versions: Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0, 
> Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
>            Reporter: Alexander Behm
>            Assignee: Alexander Behm
>            Priority: Blocker
>              Labels: correctness
>             Fix For: Impala 2.11.0
>
>
> Impala may return wrong results for plans that have a partitioned join inside 
> a union.
> *Affected queries*
> * plan has a partitioned join inside a union
> * tables must have stats - otherwise a partitioned join would not be chosen
> * for at least one equi-join condition, the left-hand side and right-hand 
> side join keys have different types
> *Reproduction*
> Setup:
> {code}
> create table a (id int);
> insert into a values (1),(2),(3),(4);
> insert into a values (5),(6),(7),(8);
> compute stats a;
> create table b (id bigint);
> insert into b values (1),(2),(3),(4);
> insert into b values (5),(6),(7),(8);
> compute stats b;
> {code}
> Query that returns correct results:
> {code}
> select v.id from
> (select distinct id from a) v join b
>  on v.id = b.id
> +----+
> | id |
> +----+
> | 1  |
> | 2  |
> | 3  |
> | 4  |
> | 5  |
> | 6  |
> | 7  |
> | 8  |
> +----+
> Fetched 8 row(s) in 0.20s
> {code}
> Query that returns wrong results:
> {code}
> select null from a limit 0
> union all
> select v.id from
> (select distinct id from a) v join b
>  on v.id = b.id
> +------+
> | null |
> +------+
> | 3    |
> | 5    |
> | 6    |
> | 7    |
> | 8    |
> +------+
> Fetched 5 row(s) in 0.12s
> Plan:
> +--------------------------------------------------+
> | Explain String                                   |
> +--------------------------------------------------+
> | Max Per-Host Resource Reservation: Memory=3.88MB |
> | Per-Host Resource Estimates: Memory=85.94MB      |
> | Codegen disabled by planner                      |
> |                                                  |
> | PLAN-ROOT SINK                                   |
> | |                                                |
> | 08:EXCHANGE [UNPARTITIONED]                      |
> | |                                                |
> | 00:UNION                                         |
> | |                                                |
> | 04:HASH JOIN [INNER JOIN, PARTITIONED]      <--- Partitioned join inside 
> union     |
> | |  hash predicates: b.id = id                    |
> | |  runtime filters: RF000 <- id                  |
> | |                                                |
> | |--06:AGGREGATE [FINALIZE]                       |
> | |  |  group by: id                               |
> | |  |                                             |
> | |  05:EXCHANGE [HASH(id)]                        |
> | |  |                                             |
> | |  02:AGGREGATE [STREAMING]                      |
> | |  |  group by: id                               |
> | |  |                                             |
> | |  01:SCAN HDFS [default.a]                      |
> | |     partitions=1/1 files=2 size=16B            |
> | |                                                |
> | 07:EXCHANGE [HASH(b.id)]                         |
> | |                                                |
> | 03:SCAN HDFS [default.b]                         |
> |    partitions=1/1 files=2 size=16B               |
> |    runtime filters: RF000 -> b.id                |
> +--------------------------------------------------+
> {code}
> *Analysis*
> The bug is a missing implicit cast in the EXCHANGE 05. The id should be cast 
> to BIGINT to be consistent with the left input of the join.
> We already have code to properly cast partition expressions in exchanges, but 
> the code incorrectly assumes that we only need to do so for hash-partitioned 
> fragments. The problem is that the UNION makes the fragment RANDOM 
> partitioned (because the union children could be arbitrarily partitioned 
> there is no guarantee on which partition is produced by the fragment).
> The buggy code is in PlanFragment#finalizeExchanges():
> {code}
> public void finalizeExchanges(Analyzer analyzer)
>       throws InternalException, NotImplementedException {
>     if (destNode_ != null) {
>       Preconditions.checkState(sink_ == null);
>       // we're streaming to an exchange node
>       DataStreamSink streamSink = new DataStreamSink(destNode_, 
> outputPartition_);
>       streamSink.setFragment(this);
>       sink_ = streamSink;
>     }
>     if (!dataPartition_.isHashPartitioned()) return; <--- Problem here
> ...
>    The following code adds casts to exchanges
> {code}
> *Workaround*
> * Use the broadcast and straight_join hints to force the join to use a 
> broadcast distribution strategy
> * Reformulate the query to avoid the join inside a union
> * Write the join result into a separate table and use that table in the 
> original query instead of 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to