Hello,
Spark sql rule RewriteDistinctAggregates will rewrite multiple distinct
expressions into two Aggregate nodes and a expand node.
The follow is the example in the class documentation, I wander if we can
reorder the second Aggregate node and the expand node and make the expand
generate fewer records?
Thanks
Second example: aggregate function without distinct and with filter clauses (in
sql):
SELECT
COUNT(DISTINCT cat1)as cat1_cnt,
COUNT(DISTINCT cat2)as cat2_cnt,
SUM(value) FILTER (WHERE id >1)AS total
FROM
data
GROUPBY
key
This translates to the following (pseudo) logical plan:
Aggregate(
key = ['key]
functions = [COUNT(DISTINCT 'cat1),
COUNT(DISTINCT 'cat2),
sum('value) with FILTER('id > 1)]
output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
LocalTableScan [...]
This rule rewrites this logical plan to the following (pseudo) logical plan:
Aggregate(
key = ['key]
functions = [count(if (('gid = 1)) 'cat1 else null),
count(if (('gid = 2)) 'cat2 else null),
first(if (('gid = 0)) 'total else null) ignore nulls]
output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
Aggregate(
key = ['key, 'cat1, 'cat2, 'gid]
functions = [sum('value) with FILTER('id > 1)]
output = ['key, 'cat1, 'cat2, 'gid, 'total])
Expand(
projections = [('key, null, null, 0, cast('value as bigint), 'id),
('key, 'cat1, null, 1, null, null),
('key, null, 'cat2, 2, null, null)]
output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
LocalTableScan [...]
Could we rewrite this logical plan to :
Aggregate(
key = ['key]
functions = [count(if (('gid = 1)) 'cat1 else null),
count(if (('gid = 2)) 'cat2 else null),
first(if (('gid = 0)) 'total else null) ignore nulls]
output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
Expand(
projections = [('key, 'total, null, null, 0, cast('value as bigint)),
('key, 'total, 'cat1, null, 1, null),
('key, 'total, null, 'cat2, 2, null)]
output = ['key, 'total, 'cat1, 'cat2, 'gid, 'value])
Aggregate(
key = ['key, 'cat1, 'cat2]
functions = [sum('value) with FILTER('id > 1)]
output = ['key, 'cat1, 'cat2, 'total])
LocalTableScan [...]