[
https://issues.apache.org/jira/browse/IMPALA-10785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386160#comment-17386160
]
pengdou1990 commented on IMPALA-10785:
--------------------------------------
I have tested official solution and my solution, my solution seems hive some
improvment, the test detail as following:
h2. Tables:
{code:java}
//hdfs table
CREATE EXTERNAL TABLE tpcds_10000_parquet.customer_parquet (
c_customer_sk INT,
c_customer_id VARCHAR(16),
c_current_cdemo_sk INT,
c_current_hdemo_sk INT,
c_current_addr_sk INT,
c_first_shipto_date_sk INT,
c_first_sales_date_sk INT,
c_salutation VARCHAR(10),
c_first_name VARCHAR(20),
c_last_name VARCHAR(30),
c_preferred_cust_flag VARCHAR(1),
c_birth_day INT,
c_birth_month INT,
c_birth_year INT,
c_birth_country VARCHAR(20),
c_login VARCHAR(13),
c_email_address VARCHAR(50),
c_last_review_date_sk INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
WITH SERDEPROPERTIES ('field.delim'='|', 'serialization.format'='|')
STORED AS PARQUET
{code}
{code:java}
//kudu table
CREATE EXTERNAL TABLE tpcds_10000_parquet.customer_kudu (
c_customer_sk INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_customer_id STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_current_cdemo_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_current_hdemo_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_current_addr_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_first_shipto_date_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_first_sales_date_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_salutation STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_first_name STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_last_name STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_preferred_cust_flag STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_birth_day INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
c_birth_month INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
c_birth_year INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
c_birth_country STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_login STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
c_email_address STRING NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
c_last_review_date_sk INT NULL ENCODING AUTO_ENCODING COMPRESSION
DEFAULT_COMPRESSION,
PRIMARY KEY (c_customer_sk)
)
PARTITION BY HASH (c_customer_sk) PARTITIONS 9
STORED AS KUDU
{code}
h2. Test SQL
{code:java}
select max(c_customer_sk),
ndv(c_customer_id),
ndv(c_salutation),
ndv(c_first_name),
ndv(c_last_name)
from
(select
c_customer_sk,
c_customer_id,
c_salutation,
c_first_name,
c_last_name
from
customer_parquet
union
all select
c_customer_sk,
c_customer_id,
c_salutation,
c_first_name,
c_last_name
from
customer_kudu
) t
{code}
h2.
h2. 4.0.0 Text Plan
{code:java}
Max Per-Host Resource Reservation: Memory=28.00MB Threads=3
Per-Host Resource Estimates: Memory=356MB
WARNING: The following tables are missing relevant table and/or column
statistics.
tpcds_10000_parquet.customer_kudu, tpcds_10000_parquet.customer_parquet
Analyzed query: SELECT max(c_customer_sk), ndv(c_customer_id),
ndv(c_salutation), ndv(c_first_name), ndv(c_last_name) FROM (SELECT
c_customer_sk, c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_parquet UNION ALL SELECT c_customer_sk,
c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_kudu) t
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
PLAN-ROOT SINK
| output exprs: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0
|
05:AGGREGATE [FINALIZE]
| output: max:merge(c_customer_sk), ndv:merge(c_customer_id),
ndv:merge(c_salutation), ndv:merge(c_first_name), ndv:merge(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=5 row-size=36B cardinality=1
| in pipelines: 05(GETNEXT), 03(OPEN)
|
04:EXCHANGE [UNPARTITIONED]
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT)
|
F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=352.02MB mem-reservation=24.00MB
thread-reservation=2
03:AGGREGATE
| output: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT), 01(OPEN), 02(OPEN)
|
00:UNION
| mem-estimate=0B mem-reservation=0B thread-reservation=0
| tuple-ids=2 row-size=52B cardinality=23.34M
| in pipelines: 01(GETNEXT), 02(GETNEXT)
|
|--02:SCAN KUDU [tpcds_10000_parquet.customer_kudu]
| mem-estimate=7.50MB mem-reservation=0B thread-reservation=1
| tuple-ids=1 row-size=68B cardinality=unavailable
| in pipelines: 02(GETNEXT)
|
01:SCAN HDFS [tpcds_10000_parquet.customer_parquet, RANDOM]
HDFS partitions=1/1 files=3 size=609.01MB
stored statistics:
table: rows=unavailable size=unavailable
columns: unavailable
extrapolated-rows=disabled max-scan-range-rows=unavailable
mem-estimate=352.00MB mem-reservation=24.00MB thread-reservation=1
tuple-ids=0 row-size=52B cardinality=23.34M
in pipelines: 01(GETNEXT)
{code}
h2. 4.0.0 Summary
{code:java}
Operator #Hosts #Inst Avg Time Max Time #Rows Est. #Rows
Peak Mem Est. Peak Mem Detail
----------------------------------------------------------------------------------------------------------------------------------------------
F03:ROOT 1 1 0.000ns 0.000ns
4.01 MB 4.00 MB
05:AGGREGATE 1 1 0.000ns 0.000ns 1 1
16.00 KB 16.00 KB FINALIZE
04:EXCHANGE 1 1 0.000ns 0.000ns 3 1
32.00 KB 16.00 KB UNPARTITIONED
F02:EXCHANGE SENDER 3 3 333.343us 1.000ms
24.00 B 0
03:AGGREGATE 3 3 466.680ms 493.014ms 3 1
175.00 KB 16.00 KB
00:UNION 3 3 464.013ms 480.014ms 24.51M 23.34M
1.27 MB 0
|--02:SCAN KUDU 3 3 843.025ms 887.026ms 12.26M -1
4.83 MB 7.50 MB tpcds_10000_parquet.customer_kudu
01:SCAN HDFS 3 3 118.336ms 155.004ms 12.26M 23.34M
70.31 MB 352.00 MB tpcds_10000_parquet.customer_parquet
{code}
h2. 4.0.0 + my solution Text Plan
{code:java}
Max Per-Host Resource Reservation: Memory=28.00MB Threads=3
Per-Host Resource Estimates: Memory=356MB
WARNING: The following tables are missing relevant table and/or column
statistics.
tpcds_10000_parquet.customer_kudu, tpcds_10000_parquet.customer_parquet
Analyzed query: SELECT max(c_customer_sk), ndv(c_customer_id),
ndv(c_salutation), ndv(c_first_name), ndv(c_last_name) FROM (SELECT
c_customer_sk, c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_parquet UNION ALL SELECT c_customer_sk,
c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_kudu) tF03:PLAN FRAGMENT [UNPARTITIONED] hosts=1
instances=1
| Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
PLAN-ROOT SINK
| output exprs: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0
|
05:AGGREGATE [FINALIZE]
| output: max:merge(c_customer_sk), ndv:merge(c_customer_id),
ndv:merge(c_salutation), ndv:merge(c_first_name), ndv:merge(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=5 row-size=36B cardinality=1
| in pipelines: 05(GETNEXT), 03(OPEN)
|
04:EXCHANGE [UNPARTITIONED]
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT)
|
F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=352.02MB mem-reservation=24.00MB
thread-reservation=2
03:AGGREGATE
| output: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT), 01(OPEN), 02(OPEN)
|
00:UNION
| pass-through-operands: all
| mem-estimate=0B mem-reservation=0B thread-reservation=0
| tuple-ids=2 row-size=68B cardinality=23.34M
| in pipelines: 01(GETNEXT), 02(GETNEXT)
|
|--02:SCAN KUDU [tpcds_10000_parquet.customer_kudu]
| mem-estimate=7.50MB mem-reservation=0B thread-reservation=1
| tuple-ids=1 row-size=68B cardinality=unavailable
| in pipelines: 02(GETNEXT)
|
01:SCAN HDFS [tpcds_10000_parquet.customer_parquet, RANDOM]
HDFS partitions=1/1 files=3 size=609.01MB
stored statistics:
table: rows=unavailable size=unavailable
columns: unavailable
extrapolated-rows=disabled max-scan-range-rows=unavailable
mem-estimate=352.00MB mem-reservation=24.00MB thread-reservation=1
tuple-ids=0 row-size=68B cardinality=23.34M
in pipelines: 01(GETNEXT)
{code}
h2. 4.0.0 + my solution Text Plan
{code:java}
Max Per-Host Resource Reservation: Memory=28.00MB Threads=3
Per-Host Resource Estimates: Memory=356MB
WARNING: The following tables are missing relevant table and/or column
statistics.
tpcds_10000_parquet.customer_kudu, tpcds_10000_parquet.customer_parquet
Analyzed query: SELECT max(c_customer_sk), ndv(c_customer_id),
ndv(c_salutation), ndv(c_first_name), ndv(c_last_name) FROM (SELECT
c_customer_sk, c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_parquet UNION ALL SELECT c_customer_sk,
c_customer_id, c_salutation, c_first_name, c_last_name FROM
tpcds_10000_parquet.customer_kudu) t
F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
PLAN-ROOT SINK
| output exprs: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0
|
05:AGGREGATE [FINALIZE]
| output: max:merge(c_customer_sk), ndv:merge(c_customer_id),
ndv:merge(c_salutation), ndv:merge(c_first_name), ndv:merge(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=5 row-size=36B cardinality=1
| in pipelines: 05(GETNEXT), 03(OPEN)
|
04:EXCHANGE [UNPARTITIONED]
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT)
|
F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=352.02MB mem-reservation=24.00MB
thread-reservation=2
03:AGGREGATE
| output: max(c_customer_sk), ndv(c_customer_id), ndv(c_salutation),
ndv(c_first_name), ndv(c_last_name)
| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
| tuple-ids=4 row-size=36B cardinality=1
| in pipelines: 03(GETNEXT), 01(OPEN), 02(OPEN)
|
00:UNION
| pass-through-operands: all
| mem-estimate=0B mem-reservation=0B thread-reservation=0
| tuple-ids=2 row-size=68B cardinality=23.34M
| in pipelines: 01(GETNEXT), 02(GETNEXT)
|
|--02:SCAN KUDU [tpcds_10000_parquet.customer_kudu]
| mem-estimate=7.50MB mem-reservation=0B thread-reservation=1
| tuple-ids=1 row-size=68B cardinality=unavailable
| in pipelines: 02(GETNEXT)
|
01:SCAN HDFS [tpcds_10000_parquet.customer_parquet, RANDOM]
HDFS partitions=1/1 files=3 size=609.01MB
stored statistics:
table: rows=unavailable size=unavailable
columns: unavailable
extrapolated-rows=disabled max-scan-range-rows=unavailable
mem-estimate=352.00MB mem-reservation=24.00MB thread-reservation=1
tuple-ids=0 row-size=68B cardinality=23.34M
in pipelines: 01(GETNEXT)
{code}
h2. 4.0.0 + my solution Summary
{code:java}
Operator #Hosts #Inst Avg Time Max Time #Rows Est. #Rows
Peak Mem Est. Peak Mem Detail
---------------------------------------------------------------------------------------------------------------------------------------------
F03:ROOT 1 1 2.000ms 2.000ms
4.01 MB 4.00 MB
05:AGGREGATE 1 1 0.000ns 0.000ns 1 1
16.00 KB 16.00 KB FINALIZE
04:EXCHANGE 1 1 0.000ns 0.000ns 3 1
32.00 KB 16.00 KB UNPARTITIONED
F02:EXCHANGE SENDER 3 3 0.000ns 0.000ns
24.00 B 0
03:AGGREGATE 3 3 496.348ms 526.015ms 3 1
1.28 MB 16.00 KB
00:UNION 3 3 1.000ms 2.000ms 24.51M 23.34M
4.00 KB 0
|--02:SCAN KUDU 3 3 1s057ms 1s106ms 12.26M -1
5.04 MB 7.50 MB tpcds_10000_parquet.customer_kudu
01:SCAN HDFS 3 3 157.338ms 174.005ms 12.26M 23.34M
70.61 MB 352.00 MB tpcds_10000_parquet.customer_parquet
{code}
{panel}
{panel}
> when union kudu table and hdfs table, union passthrough does not take effect
> ----------------------------------------------------------------------------
>
> Key: IMPALA-10785
> URL: https://issues.apache.org/jira/browse/IMPALA-10785
> Project: IMPALA
> Issue Type: Improvement
> Reporter: pengdou1990
> Assignee: pengdou1990
> Priority: Major
>
> IMPALA-3586 already supports union passthrough, and brings great performance
> improvements in union, but there is still some problems when union between
> hdfs table and kudu table ,several points cause the problem:
> # in kudu scanner node output TupleDescriptor, string slot is 16B,while in
> hdfs scanner node output TupleDescriptor, string slot is 12B,cause tuple
> memory layout mismatch
> # in kudu scanner node output TupleDescriptor, string slot is 16B, while in
> Union output TupleDescriptor, string slot is 12B,cause tuple memory layout
> mismatch
> # in Kudu Scannode, row key slot is not null, while in hdfs node, not null
> slot can't get from the metadata, cause tuple memory layout mismatch
> I hive resolved the 1st and 2nd points, how should I do with the 3rd point?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]