[ 
https://issues.apache.org/jira/browse/IMPALA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

FengZhou updated IMPALA-13428:
------------------------------
    Description: 
When using a window function, and performing a left join, filtering based on 
the field used by the window function, inconsistencies can occur during the 
pushdown process.
{code:java}
create table if not exists test.a(
       a string
       , b string
);

insert into test.a
values ('a', '1')
       , ('b', '2')
;


create table if not exists test.b(
       a string
       , b string
       , c string
);

insert into test.b
values ('a', '1', '1')
       , ('b', '2', '2')
       , ('c', '3', '3')
       , ('c', '3', '4')
       , ('c', '3', '5')
;   


select *
from (SELECT  
              t2.b
      FROM test.a t1
 LEFT JOIN (SELECT a
          , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
         -- , count(1) b
         FROM test.b) T2
ON t1.a=t2.a
) t
where b = 10{code}
The correct result should have no records at all. However, in version 4.1.2, 
the result that was executed contains 4 records with *NULL* values.

Here is the execution plan.

 
{code:java}
PLAN-ROOT SINK
|
09:EXCHANGE [UNPARTITIONED]
|
05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
|  hash predicates: a = t1.a
|  row-size=44B cardinality=224
|
|--08:EXCHANGE [HASH(t1.a)]
|  |
|  00:SCAN HDFS [test.a t1]
|     HDFS partitions=1/1 files=2 size=1.06KB
|     row-size=12B cardinality=224
|
04:SELECT
|  predicates: row_number() = 10   <---- There seems to be a problem here. 
|  row-size=32B cardinality=218
|
03:ANALYTIC
|  functions: row_number()
|  partition by: a
|  order by: b ASC
|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
|  row-size=32B cardinality=218
|
07:TOP-N
|  partition by: a
|  order by: b ASC
|  partition limit: 10
|  row-size=24B cardinality=218
|
06:EXCHANGE [HASH(a)]
|
02:TOP-N
|  partition by: a
|  order by: b ASC
|  partition limit: 10
|  source expr: row_number() = CAST(10 AS BIGINT)
|  row-size=24B cardinality=218
|
01:SCAN HDFS [test.b]
   HDFS partitions=1/1 files=2 size=1.54KB
   row-size=24B cardinality=218{code}
 

However, using the count(*) aggregate function is OK.
{code:java}
select *
from (SELECT  
              t2.b
      FROM test.a t1
 LEFT JOIN (SELECT a
         -- , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
         , count(1) b
         FROM test.b
         group by 1) T2
ON t1.a=t2.a
) t
where b = 10 {code}
{code:java}
PLAN-ROOT SINK
|
07:EXCHANGE [UNPARTITIONED]
|
03:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
|  hash predicates: a = t1.a
|  other predicates: count(*) = 10  <------------ It's OK here.
|  runtime filters: RF000 <- t1.a
|  row-size=32B cardinality=224
|
|--06:EXCHANGE [HASH(t1.a)]
|  |
|  00:SCAN HDFS [test.a t1]
|     HDFS partitions=1/1 files=2 size=1.06KB
|     row-size=12B cardinality=224
|
05:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: a
|  having: count(*) = 10
|  row-size=20B cardinality=218
|
04:EXCHANGE [HASH(a)]
|
02:AGGREGATE [STREAMING]
|  output: count(*)
|  group by: a
|  row-size=20B cardinality=218
|
01:SCAN HDFS [test.b]
   HDFS partitions=1/1 files=2 size=1.54KB
   runtime filters: RF000 -> test.b.a
   row-size=12B cardinality=218 {code}

  was:
When using a window function, and performing a left join, filtering based on 
the field used by the window function, inconsistencies can occur during the 
pushdown process.
{code:java}
create table if not exists test.a(
       a string
       , b string
);

insert into test.a
values ('a', '1')
       , ('b', '2')
;


create table if not exists test.b(
       a string
       , b string
       , c string
);

insert into test.b
values ('a', '1', '1')
       , ('b', '2', '2')
       , ('c', '3', '3')
       , ('c', '3', '4')
       , ('c', '3', '5')
;   


select *
from (SELECT  
              t2.b
      FROM test.a t1
 LEFT JOIN (SELECT a
          , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
         -- , count(1) b
         FROM test.b) T2
ON t1.a=t2.a
) t
where b = 10{code}
The correct result should have no records at all. However, in version 4.1.2, 
the result that was executed contains 4 records with *NULL* values.

Here is the execution plan.

 
{code:java}
PLAN-ROOT SINK
|
09:EXCHANGE [UNPARTITIONED]
|
05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
|  hash predicates: a = t1.a
|  row-size=44B cardinality=224
|
|--08:EXCHANGE [HASH(t1.a)]
|  |
|  00:SCAN HDFS [test.a t1]
|     HDFS partitions=1/1 files=2 size=1.06KB
|     row-size=12B cardinality=224
|
04:SELECT
|  predicates: row_number() = 10   <---- There seems to be a problem here. 
|  row-size=32B cardinality=218
|
03:ANALYTIC
|  functions: row_number()
|  partition by: a
|  order by: b ASC
|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
|  row-size=32B cardinality=218
|
07:TOP-N
|  partition by: a
|  order by: b ASC
|  partition limit: 10
|  row-size=24B cardinality=218
|
06:EXCHANGE [HASH(a)]
|
02:TOP-N
|  partition by: a
|  order by: b ASC
|  partition limit: 10
|  source expr: row_number() = CAST(10 AS BIGINT)
|  row-size=24B cardinality=218
|
01:SCAN HDFS [test.b]
   HDFS partitions=1/1 files=2 size=1.54KB
   row-size=24B cardinality=218{code}
 

 


> Using a left join with a window function results in incorrect outcomes.
> -----------------------------------------------------------------------
>
>                 Key: IMPALA-13428
>                 URL: https://issues.apache.org/jira/browse/IMPALA-13428
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Backend
>    Affects Versions: Impala 4.1.2
>            Reporter: FengZhou
>            Priority: Critical
>
> When using a window function, and performing a left join, filtering based on 
> the field used by the window function, inconsistencies can occur during the 
> pushdown process.
> {code:java}
> create table if not exists test.a(
>        a string
>        , b string
> );
> insert into test.a
> values ('a', '1')
>        , ('b', '2')
> ;
> create table if not exists test.b(
>        a string
>        , b string
>        , c string
> );
> insert into test.b
> values ('a', '1', '1')
>        , ('b', '2', '2')
>        , ('c', '3', '3')
>        , ('c', '3', '4')
>        , ('c', '3', '5')
> ;   
> select *
> from (SELECT  
>               t2.b
>       FROM test.a t1
>  LEFT JOIN (SELECT a
>           , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
>          -- , count(1) b
>          FROM test.b) T2
> ON t1.a=t2.a
> ) t
> where b = 10{code}
> The correct result should have no records at all. However, in version 4.1.2, 
> the result that was executed contains 4 records with *NULL* values.
> Here is the execution plan.
>  
> {code:java}
> PLAN-ROOT SINK
> |
> 09:EXCHANGE [UNPARTITIONED]
> |
> 05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
> |  hash predicates: a = t1.a
> |  row-size=44B cardinality=224
> |
> |--08:EXCHANGE [HASH(t1.a)]
> |  |
> |  00:SCAN HDFS [test.a t1]
> |     HDFS partitions=1/1 files=2 size=1.06KB
> |     row-size=12B cardinality=224
> |
> 04:SELECT
> |  predicates: row_number() = 10   <---- There seems to be a problem here. 
> |  row-size=32B cardinality=218
> |
> 03:ANALYTIC
> |  functions: row_number()
> |  partition by: a
> |  order by: b ASC
> |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> |  row-size=32B cardinality=218
> |
> 07:TOP-N
> |  partition by: a
> |  order by: b ASC
> |  partition limit: 10
> |  row-size=24B cardinality=218
> |
> 06:EXCHANGE [HASH(a)]
> |
> 02:TOP-N
> |  partition by: a
> |  order by: b ASC
> |  partition limit: 10
> |  source expr: row_number() = CAST(10 AS BIGINT)
> |  row-size=24B cardinality=218
> |
> 01:SCAN HDFS [test.b]
>    HDFS partitions=1/1 files=2 size=1.54KB
>    row-size=24B cardinality=218{code}
>  
> However, using the count(*) aggregate function is OK.
> {code:java}
> select *
> from (SELECT  
>               t2.b
>       FROM test.a t1
>  LEFT JOIN (SELECT a
>          -- , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
>          , count(1) b
>          FROM test.b
>          group by 1) T2
> ON t1.a=t2.a
> ) t
> where b = 10 {code}
> {code:java}
> PLAN-ROOT SINK
> |
> 07:EXCHANGE [UNPARTITIONED]
> |
> 03:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
> |  hash predicates: a = t1.a
> |  other predicates: count(*) = 10  <------------ It's OK here.
> |  runtime filters: RF000 <- t1.a
> |  row-size=32B cardinality=224
> |
> |--06:EXCHANGE [HASH(t1.a)]
> |  |
> |  00:SCAN HDFS [test.a t1]
> |     HDFS partitions=1/1 files=2 size=1.06KB
> |     row-size=12B cardinality=224
> |
> 05:AGGREGATE [FINALIZE]
> |  output: count:merge(*)
> |  group by: a
> |  having: count(*) = 10
> |  row-size=20B cardinality=218
> |
> 04:EXCHANGE [HASH(a)]
> |
> 02:AGGREGATE [STREAMING]
> |  output: count(*)
> |  group by: a
> |  row-size=20B cardinality=218
> |
> 01:SCAN HDFS [test.b]
>    HDFS partitions=1/1 files=2 size=1.54KB
>    runtime filters: RF000 -> test.b.a
>    row-size=12B cardinality=218 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to