Wan Kun created SPARK-45230: ------------------------------- Summary: Plan sorter for Aggregate after SMJ Key: SPARK-45230 URL: https://issues.apache.org/jira/browse/SPARK-45230 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Wan Kun
If the aggregate operator comes after SMJ and the grouping expressions of aggregate operator contains all the join keys of the streamed side, we can add a sorter of the streamed side of the SMJ, so the aggregate can be convert to SortAggregate which will be faster than HashAggregate. For example, with table t1(a, b, c) and t2(x, y, z): {code} SELECT a, b, sum(c) FROM t1 JOIN t2 ON t1.b = t2.y GROUP BY a, b {code} The optimized plan: {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / \ / \ / SMJ (t1.b = t2.y) | | Aggregate {code} Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side of SMJ. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | HashAggregate {code} If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following aggregate could be convert to SortAggregate, will be faster. {code} Scan(t1) Scan(t2) | | | | Exchange 1 Exchange 2 \ / Sort(t1.b, t1.a) Sort(t2.y) \ / SMJ (t1.b = t2.y) | | SortAggregate {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org