[
https://issues.apache.org/jira/browse/IMPALA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
FengZhou updated IMPALA-13428:
------------------------------
Description:
When using a window function, and performing a left join, filtering based on
the field used by the window function, inconsistencies can occur during the
pushdown process.
{code:java}
create table if not exists test.a(
a string
, b string
);
insert into test.a
values ('a', '1')
, ('b', '2')
;
create table if not exists test.b(
a string
, b string
, c string
);
insert into test.b
values ('a', '1', '1')
, ('b', '2', '2')
, ('c', '3', '3')
, ('c', '3', '4')
, ('c', '3', '5')
;
select *
from (SELECT
t2.b
FROM test.a t1
LEFT JOIN (SELECT a
, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
-- , count(1) b
FROM test.b) T2
ON t1.a=t2.a
) t
where b = 10{code}
The correct result should have no records at all. However, in version 4.1.2,
the result that was executed contains 4 records with *NULL* values.
Here is the execution plan.
{code:java}
PLAN-ROOT SINK
|
09:EXCHANGE [UNPARTITIONED]
|
05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: a = t1.a
| row-size=44B cardinality=224
|
|--08:EXCHANGE [HASH(t1.a)]
| |
| 00:SCAN HDFS [test.a t1]
| HDFS partitions=1/1 files=2 size=1.06KB
| row-size=12B cardinality=224
|
04:SELECT
| predicates: row_number() = 10 <---- There seems to be a problem here.
| row-size=32B cardinality=218
|
03:ANALYTIC
| functions: row_number()
| partition by: a
| order by: b ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| row-size=32B cardinality=218
|
07:TOP-N
| partition by: a
| order by: b ASC
| partition limit: 10
| row-size=24B cardinality=218
|
06:EXCHANGE [HASH(a)]
|
02:TOP-N
| partition by: a
| order by: b ASC
| partition limit: 10
| source expr: row_number() = CAST(10 AS BIGINT)
| row-size=24B cardinality=218
|
01:SCAN HDFS [test.b]
HDFS partitions=1/1 files=2 size=1.54KB
row-size=24B cardinality=218{code}
However, using the count(*) aggregate function is OK.
{code:java}
select *
from (SELECT
t2.b
FROM test.a t1
LEFT JOIN (SELECT a
-- , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
, count(1) b
FROM test.b
group by 1) T2
ON t1.a=t2.a
) t
where b = 10 {code}
{code:java}
PLAN-ROOT SINK
|
07:EXCHANGE [UNPARTITIONED]
|
03:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: a = t1.a
| other predicates: count(*) = 10 <------------ It's OK here.
| runtime filters: RF000 <- t1.a
| row-size=32B cardinality=224
|
|--06:EXCHANGE [HASH(t1.a)]
| |
| 00:SCAN HDFS [test.a t1]
| HDFS partitions=1/1 files=2 size=1.06KB
| row-size=12B cardinality=224
|
05:AGGREGATE [FINALIZE]
| output: count:merge(*)
| group by: a
| having: count(*) = 10
| row-size=20B cardinality=218
|
04:EXCHANGE [HASH(a)]
|
02:AGGREGATE [STREAMING]
| output: count(*)
| group by: a
| row-size=20B cardinality=218
|
01:SCAN HDFS [test.b]
HDFS partitions=1/1 files=2 size=1.54KB
runtime filters: RF000 -> test.b.a
row-size=12B cardinality=218 {code}
was:
When using a window function, and performing a left join, filtering based on
the field used by the window function, inconsistencies can occur during the
pushdown process.
{code:java}
create table if not exists test.a(
a string
, b string
);
insert into test.a
values ('a', '1')
, ('b', '2')
;
create table if not exists test.b(
a string
, b string
, c string
);
insert into test.b
values ('a', '1', '1')
, ('b', '2', '2')
, ('c', '3', '3')
, ('c', '3', '4')
, ('c', '3', '5')
;
select *
from (SELECT
t2.b
FROM test.a t1
LEFT JOIN (SELECT a
, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
-- , count(1) b
FROM test.b) T2
ON t1.a=t2.a
) t
where b = 10{code}
The correct result should have no records at all. However, in version 4.1.2,
the result that was executed contains 4 records with *NULL* values.
Here is the execution plan.
{code:java}
PLAN-ROOT SINK
|
09:EXCHANGE [UNPARTITIONED]
|
05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: a = t1.a
| row-size=44B cardinality=224
|
|--08:EXCHANGE [HASH(t1.a)]
| |
| 00:SCAN HDFS [test.a t1]
| HDFS partitions=1/1 files=2 size=1.06KB
| row-size=12B cardinality=224
|
04:SELECT
| predicates: row_number() = 10 <---- There seems to be a problem here.
| row-size=32B cardinality=218
|
03:ANALYTIC
| functions: row_number()
| partition by: a
| order by: b ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| row-size=32B cardinality=218
|
07:TOP-N
| partition by: a
| order by: b ASC
| partition limit: 10
| row-size=24B cardinality=218
|
06:EXCHANGE [HASH(a)]
|
02:TOP-N
| partition by: a
| order by: b ASC
| partition limit: 10
| source expr: row_number() = CAST(10 AS BIGINT)
| row-size=24B cardinality=218
|
01:SCAN HDFS [test.b]
HDFS partitions=1/1 files=2 size=1.54KB
row-size=24B cardinality=218{code}
> Using a left join with a window function results in incorrect outcomes.
> -----------------------------------------------------------------------
>
> Key: IMPALA-13428
> URL: https://issues.apache.org/jira/browse/IMPALA-13428
> Project: IMPALA
> Issue Type: Bug
> Components: Backend
> Affects Versions: Impala 4.1.2
> Reporter: FengZhou
> Priority: Critical
>
> When using a window function, and performing a left join, filtering based on
> the field used by the window function, inconsistencies can occur during the
> pushdown process.
> {code:java}
> create table if not exists test.a(
> a string
> , b string
> );
> insert into test.a
> values ('a', '1')
> , ('b', '2')
> ;
> create table if not exists test.b(
> a string
> , b string
> , c string
> );
> insert into test.b
> values ('a', '1', '1')
> , ('b', '2', '2')
> , ('c', '3', '3')
> , ('c', '3', '4')
> , ('c', '3', '5')
> ;
> select *
> from (SELECT
> t2.b
> FROM test.a t1
> LEFT JOIN (SELECT a
> , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
> -- , count(1) b
> FROM test.b) T2
> ON t1.a=t2.a
> ) t
> where b = 10{code}
> The correct result should have no records at all. However, in version 4.1.2,
> the result that was executed contains 4 records with *NULL* values.
> Here is the execution plan.
>
> {code:java}
> PLAN-ROOT SINK
> |
> 09:EXCHANGE [UNPARTITIONED]
> |
> 05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
> | hash predicates: a = t1.a
> | row-size=44B cardinality=224
> |
> |--08:EXCHANGE [HASH(t1.a)]
> | |
> | 00:SCAN HDFS [test.a t1]
> | HDFS partitions=1/1 files=2 size=1.06KB
> | row-size=12B cardinality=224
> |
> 04:SELECT
> | predicates: row_number() = 10 <---- There seems to be a problem here.
> | row-size=32B cardinality=218
> |
> 03:ANALYTIC
> | functions: row_number()
> | partition by: a
> | order by: b ASC
> | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> | row-size=32B cardinality=218
> |
> 07:TOP-N
> | partition by: a
> | order by: b ASC
> | partition limit: 10
> | row-size=24B cardinality=218
> |
> 06:EXCHANGE [HASH(a)]
> |
> 02:TOP-N
> | partition by: a
> | order by: b ASC
> | partition limit: 10
> | source expr: row_number() = CAST(10 AS BIGINT)
> | row-size=24B cardinality=218
> |
> 01:SCAN HDFS [test.b]
> HDFS partitions=1/1 files=2 size=1.54KB
> row-size=24B cardinality=218{code}
>
> However, using the count(*) aggregate function is OK.
> {code:java}
> select *
> from (SELECT
> t2.b
> FROM test.a t1
> LEFT JOIN (SELECT a
> -- , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b
> , count(1) b
> FROM test.b
> group by 1) T2
> ON t1.a=t2.a
> ) t
> where b = 10 {code}
> {code:java}
> PLAN-ROOT SINK
> |
> 07:EXCHANGE [UNPARTITIONED]
> |
> 03:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
> | hash predicates: a = t1.a
> | other predicates: count(*) = 10 <------------ It's OK here.
> | runtime filters: RF000 <- t1.a
> | row-size=32B cardinality=224
> |
> |--06:EXCHANGE [HASH(t1.a)]
> | |
> | 00:SCAN HDFS [test.a t1]
> | HDFS partitions=1/1 files=2 size=1.06KB
> | row-size=12B cardinality=224
> |
> 05:AGGREGATE [FINALIZE]
> | output: count:merge(*)
> | group by: a
> | having: count(*) = 10
> | row-size=20B cardinality=218
> |
> 04:EXCHANGE [HASH(a)]
> |
> 02:AGGREGATE [STREAMING]
> | output: count(*)
> | group by: a
> | row-size=20B cardinality=218
> |
> 01:SCAN HDFS [test.b]
> HDFS partitions=1/1 files=2 size=1.54KB
> runtime filters: RF000 -> test.b.a
> row-size=12B cardinality=218 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]