[ https://issues.apache.org/jira/browse/SPARK-45230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wan Kun updated SPARK-45230: ---------------------------- Description: If the aggregate operator comes after SMJ and the grouping expressions of aggregate operator contains all the join keys of the streamed side, we can add a sorter of the streamed side of the SMJ, so the aggregate can be convert to SortAggregate which will be faster than HashAggregate. For example, with table t1(a, b, c) and t2(x, y, z): {code} SELECT a, b, sum(c) FROM t1 JOIN t2 ON t1.b = t2.y GROUP BY a, b {code} The optimized plan: {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / \ / \ / SMJ (t1.b = t2.y) | | Aggregate {code} Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side of SMJ. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | HashAggregate {code} If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following aggregate could be convert to SortAggregate, will be faster. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b, t1.a) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | SortAggregate {code} Benchmark result {code} Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.16 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz Aggregate after SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Hash aggregate after SMJ 48823 48886 89 0.4 2328.1 1.0X Sort aggregate after SMJ 26529 26750 311 0.8 1265.0 1.8X {code} was: If the aggregate operator comes after SMJ and the grouping expressions of aggregate operator contains all the join keys of the streamed side, we can add a sorter of the streamed side of the SMJ, so the aggregate can be convert to SortAggregate which will be faster than HashAggregate. For example, with table t1(a, b, c) and t2(x, y, z): {code} SELECT a, b, sum(c) FROM t1 JOIN t2 ON t1.b = t2.y GROUP BY a, b {code} The optimized plan: {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / \ / \ / SMJ (t1.b = t2.y) | | Aggregate {code} Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side of SMJ. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | HashAggregate {code} If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following aggregate could be convert to SortAggregate, will be faster. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b, t1.a) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | SortAggregate {code} > Plan sorter for Aggregate after SMJ > ----------------------------------- > > Key: SPARK-45230 > URL: https://issues.apache.org/jira/browse/SPARK-45230 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 4.0.0 > Reporter: Wan Kun > Priority: Major > > If the aggregate operator comes after SMJ and the grouping expressions of > aggregate operator contains all the join keys of the streamed side, we can > add a sorter of the streamed side of the SMJ, so the aggregate can be convert > to SortAggregate which will be faster than HashAggregate. > For example, with table t1(a, b, c) and t2(x, y, z): > {code} > SELECT a, b, sum(c) > FROM t1 > JOIN t2 > ON t1.b = t2.y > GROUP BY a, b > {code} > The optimized plan: > {code} > Scan(t1) Scan(t2) > | | > | | > Exchange 1 Exchange 2 > \ / > \ / > \ / > SMJ (t1.b = t2.y) > | > | > Aggregate > {code} > Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side > of SMJ. > {code} > Scan(t1) Scan(t2) > | | > | | > Exchange 1 Exchange 2 > \ / > Sort(t1.b) Sort(t2.y) > \ / > SMJ (t1.b = t2.y) > | > | > HashAggregate > {code} > If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following > aggregate could be convert to SortAggregate, will be faster. > {code} > Scan(t1) Scan(t2) > | | > | | > Exchange 1 Exchange 2 > \ / > Sort(t1.b, t1.a) Sort(t2.y) > \ / > SMJ (t1.b = t2.y) > | > | > SortAggregate > {code} > Benchmark result > {code} > Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.16 > Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz > Aggregate after SMJ: Best Time(ms) Avg Time(ms) > Stdev(ms) Rate(M/s) Per Row(ns) Relative > ------------------------------------------------------------------------------------------------------------------------ > Hash aggregate after SMJ 48823 48886 > 89 0.4 2328.1 1.0X > Sort aggregate after SMJ 26529 26750 > 311 0.8 1265.0 1.8X > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org