[
https://issues.apache.org/jira/browse/IMPALA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218258#comment-17218258
]
gaoxiaoqing edited comment on IMPALA-10253 at 10/26/20, 9:48 AM:
-----------------------------------------------------------------
[~tarmstrong] Thank's for the great suggestion. In order to avoid join
overhead, and some complicated conditions which calculated on join node can't
be pushed down to scan node.
e.g.
{code:java}
CREATE VIEW rawdata.test_join_table AS
SELECT
-- +straight_join
events.*, sa_external_view_dim_2.event_id p_event_id,
sa_external_view_dim_2.event_name p_event_name FROM rawdata.event_ros_p7 events
LEFT OUTER JOIN
-- +BROADCAST
rawdata.event_dimension_table sa_external_view_dim_2 ON events.event_id =
sa_external_view_dim_2.event_id
{code}
{code:java}
explain select p_event_name from test_join_table where p_event_name='SignUp' or
(p_event_name='ViewProduct' and p__city="北京") group by p_event_name
{code}
Time consuming:
* p_event_name='SignUp' or (p_event_name='ViewProduct' and p__city="北京")
calculated on join node cannot be pushed down to scan node.
* group by p_event_name need mapping on aggregation node.
* join overhead.
so we choose the UDF. In addition rewrite filter in scan phase, I think It's
necessary to rewrite query when aggregation. because this can reduce the amount
of data that needs to be mapped.
e.g.
before:
{code:java}
select event, count(*)
from event_external_view_p7
group by event;
{code}
after rewrite, reverse less data.
{code:java}
select dict(event_id, '/path/to/dict'), count(*)
from (
select event_id, count(*)
from event_external_view_p7
group by event_id
) a;
{code}
e.g.
before:
{code:java}
select funnel_id, count(*) from (
select case when event = "SignUp" then 1 when event = "ViewProduct" then 2 else
0 end as funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
after rewrite, we don't need reverse data
{code:java}
select funnel_id, count(*) from (
select case when event_id = 1 then 1 when event_id = 2 then 2 else 0 end as
funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
Maybe this JIRA can contains many submits?
1. rewrite filter expression in scan phrase
2. rewrite expression in aggregate phrase
was (Author: gaoxiaoqing):
[~tarmstrong] Thank's for the great suggestion. In order to avoid join
overhead, and some complicated conditions which calculated on join node can't
be pushed down to scan node. so we choose the UDF. I'm glad to assign this JIRA
to myself.
In addition rewrite filter in scan phase, I think It's necessary to rewrite
query when aggregation. because this can reduce the amount of data that needs
to be mapped.
e.g.1
before:
{code:java}
select event, count(*)
from event_external_view_p7
group by event;
{code}
after rewrite:
{code:java}
select dict(event_id, '/path/to/dict'), count(*)
from (
select event_id, count(*)
from event_external_view_p7
group by event_id
) a;
{code}
e.g.2
before:
{code:java}
select funnel_id, count(*) from (
select case when event = "SignUp" then 1 when event = "ViewProduct" then 2 else
0 end as funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
after rewrite:
{code:java}
select funnel_id, count(*) from (
select case when event_id = 1 then 1 when event_id = 2 then 2 else 0 end as
funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
Maybe this JIRA can contains many submits?
1. rewrite filter expression in scan phrase
2. rewrite expression in aggregate phrase
> Improve query performance contains dict function
> ------------------------------------------------
>
> Key: IMPALA-10253
> URL: https://issues.apache.org/jira/browse/IMPALA-10253
> Project: IMPALA
> Issue Type: New Feature
> Components: Frontend
> Reporter: gaoxiaoqing
> Assignee: gaoxiaoqing
> Priority: Major
>
> we have the following parquet table:
> {code:java}
> CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
> event_id INT,
> user_id BIGINT,
> time TIMESTAMP,
> p_abook_type STRING
> )
> PARTITIONED BY (
> day INT,
> event_bucket INT
> )
> STORED AS PARQUET
> LOCATION 'hdfs://localhost:20500/sa/data/1/event'
> {code}
> the data show as following:
> ||event_id||user_id||time||p_abook_type||
> |1|-922235446862664806|2018-07-18 09:01:06.158|小说|
> |2|-922235446862664806|2018-07-19 09:01:06.158|小说|
> if we want remapping event_id to the real event name, we can realize dict
> udf. the dict udf is defined as DICT(BIGINT expression, STRING path). first
> parameter is the column, second parameter is hdfs path which store the
> remapping rule like this:
> {code:java}
> 1,SignUp
> 2,ViewProduct{code}
> then build a view table which add the dict column on original table:
> {code:java}
> CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*,
> dict(`event_id`, '/data/1/event.txt') AS `event` FROM rawdata.event_view_p7
> events
> {code}
> If the query group by column has dict, the query is slower then group by
> original column. when explain the sql, we found that each line data need
> remapping in SCAN phase and AGGREGATE phase.
> {code:java}
> select event, count(*) from event_external_view_p7 where event in ('SignUp',
> 'ViewProduct') group by event;{code}
> {code:java}
> PLAN-ROOT SINK
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> | output: count:merge(*)
> | group by: event
> | row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> | output: count(*)
> | group by: rawdata.DICT(event_id, '/data/1/event.txt')
> | row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> | partitions=39/39 files=99 size=9.00GB
> | predicates: rawdata.DICT(event_id, '/data/1/event.txt') IN ('SignUp',
> 'ViewProduct')
> | row-size=4B cardinality=unavailable
> {code}
> the idea is to modify plan, use original column in SCAN phase and AGGREGATE
> phase and remapping the original column at last, the new plan like this:
> {code:java}
> PLAN-ROOT SINK
> |
> 05:SELECT [FINALIZE]
> | output: dict(event_id)
> | row-size=20B cardinality=0
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> | output: count:merge(*)
> | group by: event_id
> | row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> | output: count(*)
> | group by: event_id
> | row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> | partitions=39/39 files=99 size=9.00GB
> | predicates: event_id IN (1, 2)
> | row-size=4B cardinality=unavailable
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]