http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct.test
new file mode 100644
index 0000000..d6cd1f1
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct.test
@@ -0,0 +1,1280 @@
+# Multiple distinct without grouping.
+select count(distinct tinyint_col), sum(distinct int_col), count(distinct 
smallint_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 6, count(smallint_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: count(smallint_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 6, count(smallint_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|  Class 1
+|    output: sum:merge(int_col)
+|  Class 2
+|    output: count:merge(smallint_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: count(smallint_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 
THEN murmur_hash(int_col) WHEN 5 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct without grouping. First needs intermediate tuple.
+select avg(distinct tinyint_col), sum(distinct int_col), count(distinct 
smallint_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
sum(int_col)), aggif(valid_tid() = 7, count(smallint_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: count(smallint_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
sum(int_col)), aggif(valid_tid() = 7, count(smallint_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|  Class 1
+|    output: sum:merge(int_col)
+|  Class 2
+|    output: count:merge(smallint_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: count(smallint_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 
THEN murmur_hash(int_col) WHEN 6 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct without grouping. Last needs intermediate tuple.
+select count(distinct tinyint_col), sum(distinct int_col), avg(distinct 
smallint_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 7, avg(smallint_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: avg(smallint_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 7, avg(smallint_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|  Class 1
+|    output: sum:merge(int_col)
+|  Class 2
+|    output: avg:merge(smallint_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: sum(int_col)
+|  Class 2
+|    output: avg(smallint_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 
THEN murmur_hash(int_col) WHEN 5 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct without grouping. All need intermediate tuples
+select avg(distinct tinyint_col), avg(distinct int_col), avg(distinct 
smallint_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(int_col)), aggif(valid_tid() = 9, avg(smallint_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: avg(int_col)
+|  Class 2
+|    output: avg(smallint_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(int_col)), aggif(valid_tid() = 9, avg(smallint_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|  Class 1
+|    output: avg:merge(int_col)
+|  Class 2
+|    output: avg:merge(smallint_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: avg(int_col)
+|  Class 2
+|    output: avg(smallint_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 
THEN murmur_hash(int_col) WHEN 7 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: int_col
+|  Class 2
+|    group by: smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct with grouping.
+select bigint_col, count(distinct tinyint_col), sum(distinct int_col),
+       count(distinct smallint_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 6, count(smallint_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count(smallint_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 6, count(smallint_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum:merge(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(smallint_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 6 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count(smallint_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 3 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 THEN 
murmur_hash(int_col) WHEN 5 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct with grouping. First needs intermediate tuple.
+select bigint_col, avg(distinct tinyint_col), sum(distinct int_col),
+       count(distinct smallint_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
sum(int_col)), aggif(valid_tid() = 7, count(smallint_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 5 THEN bigint_col 
WHEN 7 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count(smallint_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
sum(int_col)), aggif(valid_tid() = 7, count(smallint_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 5 THEN bigint_col 
WHEN 7 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum:merge(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(smallint_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 5 
THEN murmur_hash(bigint_col) WHEN 7 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count(smallint_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 6 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 THEN 
murmur_hash(int_col) WHEN 6 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct with grouping. Last needs intermediate tuple.
+select bigint_col, count(distinct tinyint_col), sum(distinct int_col),
+       avg(distinct smallint_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 7, avg(smallint_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 7 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
sum(int_col)), aggif(valid_tid() = 7, avg(smallint_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 7 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum:merge(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(smallint_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 6 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: sum(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 3 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 THEN 
murmur_hash(int_col) WHEN 5 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct with grouping. All need intermediate tuples
+select bigint_col, avg(distinct tinyint_col), avg(distinct int_col),
+       avg(distinct smallint_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(int_col)), aggif(valid_tid() = 9, avg(smallint_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 6 THEN bigint_col 
WHEN 9 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(int_col)), aggif(valid_tid() = 9, avg(smallint_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 6 THEN bigint_col 
WHEN 9 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(smallint_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 5 
THEN murmur_hash(bigint_col) WHEN 8 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg(int_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 7 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 THEN 
murmur_hash(int_col) WHEN 7 THEN murmur_hash(smallint_col) END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, int_col
+|  Class 2
+|    group by: bigint_col, smallint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping.
+select count(distinct tinyint_col), count(distinct smallint_col), 
count(int_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, count(int_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, count(int_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|  Class 1
+|    output: count:merge(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count:merge(int_col)
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 
THEN murmur_hash(smallint_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. First distinct needs
+# intermediate agg tuple.
+select avg(distinct tinyint_col), count(distinct smallint_col), count(int_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
count(smallint_col)), aggif(valid_tid() = 6, count(int_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
count(smallint_col)), aggif(valid_tid() = 6, count(int_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|  Class 1
+|    output: count:merge(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: count:merge(int_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count:merge(int_col)
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 
THEN murmur_hash(smallint_col) WHEN 6 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: count(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. Non-distinct needs
+# intermediate agg tuple.
+select count(distinct tinyint_col), count(distinct smallint_col), avg(int_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 6, avg(int_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 6, avg(int_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|  Class 1
+|    output: count:merge(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: count(tinyint_col)
+|  Class 1
+|    output: count(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 
THEN murmur_hash(smallint_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. All need intermediate 
agg tuples.
+select avg(distinct tinyint_col), avg(distinct smallint_col), avg(int_col)
+from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(smallint_col)), aggif(valid_tid() = 8, avg(int_col))
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: avg(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+01:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(smallint_col)), aggif(valid_tid() = 8, avg(int_col))
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|  Class 1
+|    output: avg:merge(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  Class 0
+|    output: avg(tinyint_col)
+|  Class 1
+|    output: avg(smallint_col)
+|  Class 2
+|    output: avg:merge(int_col)
+|
+05:AGGREGATE
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 
THEN murmur_hash(smallint_col) WHEN 7 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: tinyint_col
+|  Class 1
+|    group by: smallint_col
+|  Class 2
+|    output: avg(int_col)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct with grouping.
+select bigint_col, count(distinct tinyint_col), count(distinct smallint_col),
+       count(int_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, count(int_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 5 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 5, count(int_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 5 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count:merge(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 3 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 THEN 
murmur_hash(smallint_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. First distinct needs
+# intermediate agg tuple.
+select bigint_col, avg(distinct tinyint_col), count(distinct smallint_col),
+       count(int_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
count(smallint_col)), aggif(valid_tid() = 6, count(int_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 5 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 5, 
count(smallint_col)), aggif(valid_tid() = 6, count(int_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 5 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count:merge(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 5 
THEN murmur_hash(bigint_col) WHEN 6 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count:merge(int_col)
+|    group by: bigint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 6 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 THEN 
murmur_hash(smallint_col) WHEN 6 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: count(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. Non-distinct needs
+# intermediate agg tuple.
+select bigint_col, count(distinct tinyint_col), count(distinct smallint_col),
+       avg(int_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 6, avg(int_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 2, count(tinyint_col)), aggif(valid_tid() = 4, 
count(smallint_col)), aggif(valid_tid() = 6, avg(int_col))
+|  group by: CASE valid_tid() WHEN 2 THEN bigint_col WHEN 4 THEN bigint_col 
WHEN 6 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: count:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count:merge(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: count(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: count(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 3 
THEN murmur_hash(bigint_col) WHEN 5 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 3 THEN 
murmur_hash(smallint_col) WHEN 5 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Multiple distinct and non-distinct without grouping. All need intermediate 
agg tuples.
+select bigint_col, avg(distinct tinyint_col), avg(distinct smallint_col),
+       avg(int_col)
+from functional.alltypes group by bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(smallint_col)), aggif(valid_tid() = 8, avg(int_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 6 THEN bigint_col 
WHEN 8 THEN bigint_col END
+|
+02:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+01:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid() = 3, avg(tinyint_col)), aggif(valid_tid() = 6, 
avg(smallint_col)), aggif(valid_tid() = 8, avg(int_col))
+|  group by: CASE valid_tid() WHEN 3 THEN bigint_col WHEN 6 THEN bigint_col 
WHEN 8 THEN bigint_col END
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg:merge(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+06:EXCHANGE [HASH(CASE valid_tid() WHEN 2 THEN murmur_hash(bigint_col) WHEN 5 
THEN murmur_hash(bigint_col) WHEN 7 THEN murmur_hash(bigint_col) END)]
+|
+02:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(tinyint_col)
+|    group by: bigint_col
+|  Class 1
+|    output: avg(smallint_col)
+|    group by: bigint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+05:AGGREGATE
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg:merge(int_col)
+|    group by: bigint_col
+|
+04:EXCHANGE [HASH(CASE valid_tid() WHEN 1 THEN murmur_hash(bigint_col) WHEN 4 
THEN murmur_hash(bigint_col) WHEN 7 THEN murmur_hash(bigint_col) END,CASE 
valid_tid() WHEN 1 THEN murmur_hash(tinyint_col) WHEN 4 THEN 
murmur_hash(smallint_col) WHEN 7 THEN 0 END)]
+|
+01:AGGREGATE [STREAMING]
+|  Class 0
+|    group by: bigint_col, tinyint_col
+|  Class 1
+|    group by: bigint_col, smallint_col
+|  Class 2
+|    output: avg(int_col)
+|    group by: bigint_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-query/queries/QueryTest/multiple-distinct-aggs.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/multiple-distinct-aggs.test
 
b/testdata/workloads/functional-query/queries/QueryTest/multiple-distinct-aggs.test
new file mode 100644
index 0000000..b3ac83d
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/multiple-distinct-aggs.test
@@ -0,0 +1,424 @@
+====
+---- QUERY
+# Distinct and non-distinct without grouping.
+select count(distinct tinyint_col), count(smallint_col) from alltypes
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+10,7300
+====
+---- QUERY
+# Distinct and non-distinct without grouping. Distinct needs intermediate agg 
tuple.
+select avg(distinct tinyint_col), count(smallint_col) from alltypes
+---- TYPES
+DOUBLE,BIGINT
+---- RESULTS
+4.5,7300
+====
+---- QUERY
+# Distinct and non-distinct without grouping. Non-distinct needs intermediate 
agg tuple.
+select count(distinct tinyint_col), avg(smallint_col) from alltypes
+---- TYPES
+BIGINT,DOUBLE
+---- RESULTS
+10,4.5
+====
+---- QUERY
+# Distinct and non-distinct without grouping. Both need intermediate agg 
tuples.
+select avg(distinct tinyint_col), avg(smallint_col) from alltypes
+---- TYPES
+DOUBLE,DOUBLE
+---- RESULTS
+4.5,4.5
+====
+---- QUERY
+# Distinct and non-distinct with grouping.
+select bigint_col, count(distinct tinyint_col), count(smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,BIGINT
+---- RESULTS
+0,1,730
+10,1,730
+20,1,730
+30,1,730
+40,1,730
+50,1,730
+60,1,730
+70,1,730
+80,1,730
+90,1,730
+====
+---- QUERY
+# Distinct and non-distinct with grouping. Distinct needs intermediate agg 
tuple.
+select bigint_col, avg(distinct tinyint_col), count(smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,BIGINT
+---- RESULTS
+0,0,730
+10,1,730
+20,2,730
+30,3,730
+40,4,730
+50,5,730
+60,6,730
+70,7,730
+80,8,730
+90,9,730
+====
+---- QUERY
+# Distinct and non-distinct with grouping. Non-distinct needs intermediate agg 
tuple.
+select bigint_col, count(distinct tinyint_col), avg(smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,DOUBLE
+---- RESULTS
+0,1,0
+10,1,1
+20,1,2
+30,1,3
+40,1,4
+50,1,5
+60,1,6
+70,1,7
+80,1,8
+90,1,9
+====
+---- QUERY
+# Distinct and non-distinct with grouping. Both need intermediate agg tuples.
+select bigint_col, avg(distinct tinyint_col), avg(smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,DOUBLE
+---- RESULTS
+0,0,0
+10,1,1
+20,2,2
+30,3,3
+40,4,4
+50,5,5
+60,6,6
+70,7,7
+80,8,8
+90,9,9
+====
+---- QUERY
+# Multiple distinct without grouping.
+select count(distinct tinyint_col), sum(distinct int_col), count(distinct 
smallint_col)
+from alltypes
+---- TYPES
+BIGINT,BIGINT,BIGINT
+---- RESULTS
+10,45,10
+====
+---- QUERY
+# Multiple distinct without grouping. First needs intermediate tuple.
+select avg(distinct tinyint_col), sum(distinct int_col), count(distinct 
smallint_col)
+from alltypes
+---- TYPES
+DOUBLE,BIGINT,BIGINT
+---- RESULTS
+4.5,45,10
+====
+---- QUERY
+# Multiple distinct without grouping. Last needs intermediate tuple.
+select count(distinct tinyint_col), sum(distinct int_col), avg(distinct 
smallint_col)
+from alltypes
+---- TYPES
+BIGINT,BIGINT,DOUBLE
+---- RESULTS
+10,45,4.5
+====
+---- QUERY
+# Multiple distinct without grouping. All need intermediate tuples
+select avg(distinct tinyint_col), avg(distinct int_col), avg(distinct 
smallint_col)
+from alltypes
+---- TYPES
+DOUBLE,DOUBLE,DOUBLE
+---- RESULTS
+4.5,4.5,4.5
+====
+---- QUERY
+# Multiple distinct with grouping.
+select bigint_col, count(distinct tinyint_col), sum(distinct int_col),
+       count(distinct smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+0,1,0,1
+10,1,1,1
+20,1,2,1
+30,1,3,1
+40,1,4,1
+50,1,5,1
+60,1,6,1
+70,1,7,1
+80,1,8,1
+90,1,9,1
+====
+---- QUERY
+# Multiple distinct with grouping. First needs intermediate tuple.
+select bigint_col, avg(distinct tinyint_col), sum(distinct int_col),
+       count(distinct smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,BIGINT,BIGINT
+---- RESULTS
+0,0,0,1
+10,1,1,1
+20,2,2,1
+30,3,3,1
+40,4,4,1
+50,5,5,1
+60,6,6,1
+70,7,7,1
+80,8,8,1
+90,9,9,1
+====
+---- QUERY
+# Multiple distinct with grouping. Last needs intermediate tuple.
+select bigint_col, count(distinct tinyint_col), sum(distinct int_col),
+       avg(distinct smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,BIGINT,DOUBLE
+---- RESULTS
+0,1,0,0
+10,1,1,1
+20,1,2,2
+30,1,3,3
+40,1,4,4
+50,1,5,5
+60,1,6,6
+70,1,7,7
+80,1,8,8
+90,1,9,9
+====
+---- QUERY
+# Multiple distinct with grouping. All need intermediate tuples
+select bigint_col, avg(distinct tinyint_col), avg(distinct int_col),
+       avg(distinct smallint_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,DOUBLE,DOUBLE
+---- RESULTS
+0,0,0,0
+10,1,1,1
+20,2,2,2
+30,3,3,3
+40,4,4,4
+50,5,5,5
+60,6,6,6
+70,7,7,7
+80,8,8,8
+90,9,9,9
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping.
+select count(distinct tinyint_col), count(distinct smallint_col), 
count(int_col)
+from alltypes
+---- TYPES
+BIGINT,BIGINT,BIGINT
+---- RESULTS
+10,10,7300
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. First distinct needs
+# intermediate agg tuple.
+select avg(distinct tinyint_col), count(distinct smallint_col), count(int_col)
+from alltypes
+---- TYPES
+DOUBLE,BIGINT,BIGINT
+---- RESULTS
+4.5,10,7300
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. Non-distinct needs
+# intermediate agg tuple.
+select count(distinct tinyint_col), count(distinct smallint_col), avg(int_col)
+from alltypes
+---- TYPES
+BIGINT,BIGINT,DOUBLE
+---- RESULTS
+10,10,4.5
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. All need intermediate 
agg tuples.
+select avg(distinct tinyint_col), avg(distinct smallint_col), avg(int_col)
+from alltypes
+---- TYPES
+DOUBLE,DOUBLE,DOUBLE
+---- RESULTS
+4.5,4.5,4.5
+====
+---- QUERY
+# Multiple distinct and non-distinct with grouping.
+select bigint_col, count(distinct tinyint_col), count(distinct smallint_col),
+       count(int_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+0,1,1,730
+10,1,1,730
+20,1,1,730
+30,1,1,730
+40,1,1,730
+50,1,1,730
+60,1,1,730
+70,1,1,730
+80,1,1,730
+90,1,1,730
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. First distinct needs
+# intermediate agg tuple.
+select bigint_col, avg(distinct tinyint_col), count(distinct smallint_col),
+       count(int_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,BIGINT,BIGINT
+---- RESULTS
+0,0,1,730
+10,1,1,730
+20,2,1,730
+30,3,1,730
+40,4,1,730
+50,5,1,730
+60,6,1,730
+70,7,1,730
+80,8,1,730
+90,9,1,730
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. Non-distinct needs
+# intermediate agg tuple.
+select bigint_col, count(distinct tinyint_col), count(distinct smallint_col),
+       avg(int_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,BIGINT,BIGINT,DOUBLE
+---- RESULTS
+0,1,1,0
+10,1,1,1
+20,1,1,2
+30,1,1,3
+40,1,1,4
+50,1,1,5
+60,1,1,6
+70,1,1,7
+80,1,1,8
+90,1,1,9
+====
+---- QUERY
+# Multiple distinct and non-distinct without grouping. All need intermediate 
agg tuples.
+select bigint_col, avg(distinct tinyint_col), avg(distinct smallint_col),
+       avg(int_col)
+from alltypes group by bigint_col
+---- TYPES
+BIGINT,DOUBLE,DOUBLE,DOUBLE
+---- RESULTS
+0,0,0,0
+10,1,1,1
+20,2,2,2
+30,3,3,3
+40,4,4,4
+50,5,5,5
+60,6,6,6
+70,7,7,7
+80,8,8,8
+90,9,9,9
+====
+---- QUERY
+# Multiple distinct with constant and null
+select count(distinct 0), count(distinct null) from alltypes
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+1,0
+====
+---- QUERY
+# Multiple distinct with agg that returns a string (group_concat)
+select id, count(distinct id), group_concat(distinct string_col)
+from alltypestiny group by id
+---- TYPES
+INT,BIGINT,STRING
+---- RESULTS
+4,1,'0'
+2,1,'0'
+6,1,'0'
+0,1,'0'
+7,1,'1'
+1,1,'1'
+5,1,'1'
+3,1,'1'
+====
+---- QUERY
+# Multiple distinct over more complex espressions
+select count(distinct id % 2),
+  count(distinct concat(string_col, 'a')) > 0,
+  sum(distinct tinyint_col * 0),
+  abs(count(distinct id) * 100)
+from alltypestiny;
+---- TYPES
+BIGINT,BOOLEAN,BIGINT,BIGINT
+---- RESULTS
+2,True,0,800
+====
+---- QUERY
+# Multiple distinct inside a subplan
+select id, v.cnt, v.sm
+from functional_parquet.complextypestbl a cross join
+  (select count(distinct item) cnt, sum(distinct item) sm from a.int_array) v;
+---- TYPES
+BIGINT,BIGINT,BIGINT
+---- RESULTS
+1,3,6
+2,3,6
+3,0,NULL
+4,0,NULL
+5,0,NULL
+6,0,NULL
+7,0,NULL
+8,1,-1
+====
+---- QUERY
+# Multiple distinct with a subquery
+select sum(distinct v.cnt), count(distinct v.sm)
+from (select id, count(distinct int_col) cnt, sum(distinct tinyint_col) sm
+  from alltypestiny group by id) v;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+1,2
+====
+---- QUERY
+# Multiple distinct with NULLs (from the left outer join)
+select count(distinct a.id), count(distinct b.id), avg(distinct b.tinyint_col)
+from alltypessmall a left outer join alltypestiny b on a.id = b.id
+where a.id < 12 and a.id > 5;
+---- TYPES
+BIGINT,BIGINT,DOUBLE
+---- RESULTS
+6,2,0.5
+====
+---- QUERY
+# Multiple distinct with a larger number of classes
+select
+  count(distinct id),
+  count(distinct tinyint_col),
+  count(distinct smallint_col),
+  count(distinct int_col),
+  count(distinct bigint_col),
+  count(distinct double_col),
+  count(distinct float_col),
+  count(distinct string_col),
+  count(distinct timestamp_col)
+from alltypestiny;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+8,2,2,2,2,2,2,2,8
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test 
b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
index b62b050..e2c624e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
@@ -159,3 +159,63 @@ BIGINT
 ---- RUNTIME_PROFILE
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Multiple distinct
+set buffer_pool_limit=30M;
+select count(distinct l_orderkey), count(distinct l_partkey) from lineitem
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+1500000,200000
+---- RUNTIME_PROFILE
+row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# Multiple distinct with string col and group by
+set buffer_pool_limit=50m;
+select l_linenumber, count(distinct l_orderkey), count(distinct l_comment) 
from lineitem
+group by 1 order by 1 limit 5
+---- TYPES
+INT,BIGINT,BIGINT
+---- RESULTS
+1,1500000,1273334
+2,1285828,1102714
+3,1071394,929553
+4,857015,753374
+5,643287,574337
+---- RUNTIME_PROFILE
+row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# Multiple distinct and non-distinct, with an intermediate tuple (avg)
+set buffer_pool_limit=40m;
+select avg(distinct l_orderkey), count(distinct l_partkey), sum(l_tax), 
count(l_suppkey)
+from tpch_parquet.lineitem
+---- TYPES
+DOUBLE,BIGINT,DECIMAL,BIGINT
+---- RESULTS
+2999991.5,200000,240129.67,6001215
+---- RUNTIME_PROFILE
+row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# Multiple distinct and non-distinct, with a group by
+set buffer_pool_limit=55m;
+select l_linenumber, avg(distinct l_orderkey), count(distinct l_partkey), 
sum(l_tax), count(l_suppkey)
+from tpch_parquet.lineitem
+group by 1 order by 1 limit 5
+---- TYPES
+INT,DOUBLE,BIGINT,DECIMAL,BIGINT
+---- RESULTS
+1,2999991.5,199893,60025.25,1500000
+2,3000615.766574534,199674,51457.37,1285828
+3,3000079.631604246,199036,42879.38,1071394
+4,3000330.547357981,197222,34279.36,857015
+5,2999188.900650876,191905,25745.25,643287
+---- RUNTIME_PROFILE
+row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/df53ec23/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py 
b/tests/query_test/test_aggregation.py
index daaa741..55233fa 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -359,10 +359,14 @@ class TestDistinctAggregation(ImpalaTestSuite):
     if vector.get_value('table_format').file_format == 'hbase':
       pytest.xfail("HBase returns columns in alphabetical order for select 
distinct *, "
                    "making the result verication to fail.")
-    if vector.get_value('table_format').file_format == 'kudu':
-      pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed 
for kudu")
     self.run_test_case('QueryTest/distinct', vector)
 
+  def test_multiple_distinct(self, vector):
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail("HBase returns columns in alphabetical order for select 
distinct *, "
+                   "making the result verication to fail.")
+    self.run_test_case('QueryTest/multiple-distinct-aggs', vector)
+
 
 class TestWideAggregationQueries(ImpalaTestSuite):
   """Test that aggregations with many grouping columns work"""

Reply via email to