[
https://issues.apache.org/jira/browse/IMPALA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218258#comment-17218258
]
gaoxiaoqing edited comment on IMPALA-10253 at 10/21/20, 12:17 PM:
------------------------------------------------------------------
[~tarmstrong] Thank's for the great suggestion. In order to avoid join
overhead, we choose the UDF dict. I'm glad to assign this JIRA to myself.
In addition rewrite filter in scan phase, I think It's necessary to rewrite
query when aggregation and sort. because this can reduce the amount of data
that needs to be mapped.
e.g.1
before:
{code:java}
select event, count(*)
from event_external_view_p7
group by event;
{code}
after rewrite:
{code:java}
select dict(event_id, '/path/to/dict'), count(*)
from (
select event_id, count(*)
from event_external_view_p7
group by event_id
) a;
{code}
e.g.2
before:
{code:java}
select max(event)
from event_external_view_p7;
{code}
after rewrite:
{code:java}
select dict(event_id, '/path/to/dict')
from (
select max(event_id) as event_id
from event_external_view_p7
) a;
{code}
e.g.3
before:
{code:java}
select * from event_external_view_p7 order by event;
{code}
after rewrite:
{code:java}
select * from (
select * from event_external_view_p7 order by event_id
);
{code}
Maybe this JIRA can contains many submits?
1. rewrite filter expression in scan phrase
2. rewrite expression in aggregate phrase
3. rewrite expression in order by phrases
was (Author: gaoxiaoqing):
[~tarmstrong] Thank's for the great suggestion. In order to avoid join
overhead, we choose the UDF dict. I'm glad to assign this JIRA to myself.
In addition rewrite filter in scan phase, I think It's necessary to rewrite
query when aggregation and sort. because this can reduce the amount of data
that needs to be mapped.
e.g.1
before:
{code:java}
select event, count(*)
from event_external_view_p7
group by event;
{code}
after rewrite:
{code:java}
select dict(event_id, '/path/to/dict'), count(*)
from (
select event_id, count(*)
from event_external_view_p7
group by event_id
) a;
{code}
e.g.2
before:
{code:java}
select max(event)
from event_external_view_p7;
{code}
after rewrite:
{code:java}
select dict(event_id, '/path/to/dict')
from (
select max(event_id) as event_id
from event_external_view_p7
) a;
{code}
e.g.3
before:
{code:java}
select * from event_external_view_p7 order by event;
{code}
after rewrite:
{code:java}
select * from (
select * from event_external_view_p7 order by event_id
);
{code}
Maybe this JIRA can contains many submits?
1. rewrite filter expression in scan phrase
2. rewrite expression in aggregate phrase
3. rewrite expression in order by phrases
> Improve query performance contains dict function
> ------------------------------------------------
>
> Key: IMPALA-10253
> URL: https://issues.apache.org/jira/browse/IMPALA-10253
> Project: IMPALA
> Issue Type: New Feature
> Components: Frontend
> Reporter: gaoxiaoqing
> Priority: Major
>
> we have the following parquet table:
> {code:java}
> CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
> event_id INT,
> user_id BIGINT,
> time TIMESTAMP,
> p_abook_type STRING
> )
> PARTITIONED BY (
> day INT,
> event_bucket INT
> )
> STORED AS PARQUET
> LOCATION 'hdfs://localhost:20500/sa/data/1/event'
> {code}
> the data show as following:
> ||event_id||user_id||time||p_abook_type||
> |1|-922235446862664806|2018-07-18 09:01:06.158|小说|
> |2|-922235446862664806|2018-07-19 09:01:06.158|小说|
> if we want remapping event_id to the real event name, we can realize dict
> udf. the dict udf is defined as DICT(BIGINT expression, STRING path). first
> parameter is the column, second parameter is hdfs path which store the
> remapping rule like this:
> {code:java}
> 1,SignUp
> 2,ViewProduct{code}
> then build a view table which add the dict column on original table:
> {code:java}
> CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*,
> dict(`event_id`, '/data/1/event.txt') AS `event` FROM rawdata.event_view_p7
> events
> {code}
> If the query group by column has dict, the query is slower then group by
> original column. when explain the sql, we found that each line data need
> remapping in SCAN phase and AGGREGATE phase.
> {code:java}
> select event, count(*) from event_external_view_p7 where event in ('SignUp',
> 'ViewProduct') group by event;{code}
> {code:java}
> PLAN-ROOT SINK
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> | output: count:merge(*)
> | group by: event
> | row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> | output: count(*)
> | group by: rawdata.DICT(event_id, '/data/1/event.txt')
> | row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> | partitions=39/39 files=99 size=9.00GB
> | predicates: rawdata.DICT(event_id, '/data/1/event.txt') IN ('SignUp',
> 'ViewProduct')
> | row-size=4B cardinality=unavailable
> {code}
> the idea is to modify plan, use original column in SCAN phase and AGGREGATE
> phase and remapping the original column at last, the new plan like this:
> {code:java}
> PLAN-ROOT SINK
> |
> 05:SELECT [FINALIZE]
> | output: dict(event_id)
> | row-size=20B cardinality=0
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> | output: count:merge(*)
> | group by: event_id
> | row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> | output: count(*)
> | group by: event_id
> | row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> | partitions=39/39 files=99 size=9.00GB
> | predicates: event_id IN (1, 2)
> | row-size=4B cardinality=unavailable
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]