[
https://issues.apache.org/jira/browse/IMPALA-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628287#comment-16628287
]
ASF subversion and git services commented on IMPALA-110:
--------------------------------------------------------
Commit df53ec2385190bba2b3cefb43b094cde6d33642f in impala's branch
refs/heads/master from [~twmarshall]
[ https://git-wip-us.apache.org/repos/asf?p=impala.git;h=df53ec2 ]
IMPALA-110: Support for multiple DISTINCT
This patch adds support for having multiple aggregate functions in a
single SELECT block that use DISTINCT over different sets of columns.
Planner design:
- The existing tree-based plan shape with a two-phased
aggregation is maintained.
- Existing plans are not changed.
- Aggregates are grouped into 'aggregation classes' based on their
expressions in the distinct portion which may be empty for
non-distinct aggregates.
- The aggregation framework is generalized to simultaneously process
multiple aggregation classes within the tree-based plan. This
process splits the results of different aggregation classes into
separate rows, so a final aggregation is needed to transpose the
results into the desired form.
- Main challenge: Each aggregation class consumes and produces
different tuples, so conceptually a union-type of tuples flows
through the runtime. The tuple union is represented by a TupleRow
with one tuple per aggregation class. Only one tuple in such a
TupleRow is non-NULL.
- Backend exec nodes in the aggregation plan will be aware of this
tuple-union either explicitly in their implementation or by relying
on expressions that distinguish the aggregation classes.
- To distinguish the aggregation classes, e.g. in hash exchanges,
CASE expressions are crafted to hash/group on the appropriate slots.
Deferred FE work:
- Beautify/condense the long CASE exprs
- Push applicable conjuncts into individual aggregators before
the transposition step
- Added a few testing TODOs to reduce the size of this patch
- Decide whether we want to change existing plans to the new model
Execution design:
- Previous patches separated out aggregation logic from the exec node
into Aggregators. This is extended to support multiple Aggregators
per node, with different grouping and aggregating functions.
- There is a fast path for aggregations with only one aggregator,
which leaves the execution essentially unchanged from before.
- When there are multiple aggregators, the first aggregation node in
the plan replicates its input to each aggregator. The output of this
step is rows where only a single tuple is non-null, corresponding to
the aggregator that produced the row.
- A new expr is introduced, ValidTupleId, which takes one of these
rows and returns which tuple is non-null.
- For additional aggregation nodes, the input is split apart into
'mini-batches' according to which aggregator the row corresponds to.
Testing:
- Added analyzer and planner tests
- Added end-to-end queries tests
- Ran hdfs/core tests
- Added support in the query generator and ran in a loop.
Change-Id: I055402eaef6d81e5f70e850d9f8a621e766830a4
Reviewed-on: http://gerrit.cloudera.org:8080/10771
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
> Add support for multiple distinct operators in the same query block
> -------------------------------------------------------------------
>
> Key: IMPALA-110
> URL: https://issues.apache.org/jira/browse/IMPALA-110
> Project: IMPALA
> Issue Type: New Feature
> Components: Backend, Frontend
> Affects Versions: Impala 0.5, Impala 1.4, Impala 2.0, Impala 2.2, Impala
> 2.3.0
> Reporter: Greg Rahn
> Assignee: Thomas Tauber-Marshall
> Priority: Major
> Labels: sql-language
>
> Impala only allows a single (DISTINCT columns) expression in each query.
> {color:red}Note:
> If you do not need precise accuracy, you can produce an estimate of the
> distinct values for a column by specifying NDV(column); a query can contain
> multiple instances of NDV(column). To make Impala automatically rewrite
> COUNT(DISTINCT) expressions to NDV(), enable the APPX_COUNT_DISTINCT query
> option.
> {color}
> {code}
> [impala:21000] > select count(distinct i_class_id) from item;
> Query: select count(distinct i_class_id) from item
> Query finished, fetching results ...
> 16
> Returned 1 row(s) in 1.51s
> {code}
> {code}
> [impala:21000] > select count(distinct i_class_id), count(distinct
> i_brand_id) from item;
> Query: select count(distinct i_class_id), count(distinct i_brand_id) from item
> ERROR: com.cloudera.impala.common.AnalysisException: Analysis exception (in
> select count(distinct i_class_id), count(distinct i_brand_id) from item)
> at
> com.cloudera.impala.analysis.AnalysisContext.analyze(AnalysisContext.java:133)
> at
> com.cloudera.impala.service.Frontend.createExecRequest(Frontend.java:221)
> at
> com.cloudera.impala.service.JniFrontend.createExecRequest(JniFrontend.java:89)
> Caused by: com.cloudera.impala.common.AnalysisException: all DISTINCT
> aggregate functions need to have the same set of parameters as COUNT(DISTINCT
> i_class_id); deviating function: COUNT(DISTINCT i_brand_id)
> at
> com.cloudera.impala.analysis.AggregateInfo.createDistinctAggInfo(AggregateInfo.java:196)
> at
> com.cloudera.impala.analysis.AggregateInfo.create(AggregateInfo.java:143)
> at
> com.cloudera.impala.analysis.SelectStmt.createAggInfo(SelectStmt.java:466)
> at
> com.cloudera.impala.analysis.SelectStmt.analyzeAggregation(SelectStmt.java:347)
> at com.cloudera.impala.analysis.SelectStmt.analyze(SelectStmt.java:155)
> at
> com.cloudera.impala.analysis.AnalysisContext.analyze(AnalysisContext.java:130)
> ... 2 more
> {code}
> Hive supports this:
> {code}
> $ hive -e "select count(distinct i_class_id), count(distinct i_brand_id) from
> item;"
> Logging initialized using configuration in
> file:/etc/hive/conf.dist/hive-log4j.properties
> Hive history file=/tmp/grahn/hive_job_log_grahn_201303052234_1625576708.txt
> Total MapReduce jobs = 1
> Launching Job 1 out of 1
> Number of reduce tasks determined at compile time: 1
> In order to change the average load for a reducer (in bytes):
> set hive.exec.reducers.bytes.per.reducer=<number>
> In order to limit the maximum number of reducers:
> set hive.exec.reducers.max=<number>
> In order to set a constant number of reducers:
> set mapred.reduce.tasks=<number>
> Starting Job = job_201302081514_0073, Tracking URL =
> http://impala:50030/jobdetails.jsp?jobid=job_201302081514_0073
> Kill Command = /usr/lib/hadoop/bin/hadoop job
> -Dmapred.job.tracker=m0525.mtv.cloudera.com:8021 -kill job_201302081514_0073
> Hadoop job information for Stage-1: number of mappers: 1; number of reducers:
> 1
> 2013-03-05 22:34:43,255 Stage-1 map = 0%, reduce = 0%
> 2013-03-05 22:34:49,323 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:50,337 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:51,351 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:52,360 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:53,370 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:54,379 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 4.81
> sec
> 2013-03-05 22:34:55,389 Stage-1 map = 100%, reduce = 100%, Cumulative CPU
> 8.58 sec
> 2013-03-05 22:34:56,402 Stage-1 map = 100%, reduce = 100%, Cumulative CPU
> 8.58 sec
> 2013-03-05 22:34:57,413 Stage-1 map = 100%, reduce = 100%, Cumulative CPU
> 8.58 sec
> 2013-03-05 22:34:58,424 Stage-1 map = 100%, reduce = 100%, Cumulative CPU
> 8.58 sec
> MapReduce Total cumulative CPU time: 8 seconds 580 msec
> Ended Job = job_201302081514_0073
> MapReduce Jobs Launched:
> Job 0: Map: 1 Reduce: 1 Cumulative CPU: 8.58 sec HDFS Read: 0 HDFS
> Write: 0 SUCCESS
> Total MapReduce CPU Time Spent: 8 seconds 580 msec
> OK
> 16 952
> Time taken: 25.666 seconds
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]