Repository: spark Updated Branches: refs/heads/branch-1.6 5e89f1645 -> ce90bbe42
[SPARK-11863][SQL] Unable to resolve order by if it contains mixture of aliases and real columns this is based on https://github.com/apache/spark/pull/9844, with some bug fix and clean up. The problems is that, normal operator should be resolved based on its child, but `Sort` operator can also be resolved based on its grandchild. So we have 3 rules that can resolve `Sort`: `ResolveReferences`, `ResolveSortReferences`(if grandchild is `Project`) and `ResolveAggregateFunctions`(if grandchild is `Aggregate`). For example, `select c1 as a , c2 as b from tab group by c1, c2 order by a, c2`, we need to resolve `a` and `c2` for `Sort`. Firstly `a` will be resolved in `ResolveReferences` based on its child, and when we reach `ResolveAggregateFunctions`, we will try to resolve both `a` and `c2` based on its grandchild, but failed because `a` is not a legal aggregate expression. whoever merge this PR, please give the credit to dilipbiswal Author: Dilip Biswal <[email protected]> Author: Wenchen Fan <[email protected]> Closes #9961 from cloud-fan/sort. (cherry picked from commit bc16a67562560c732833260cbc34825f7e9dcb8f) Signed-off-by: Michael Armbrust <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce90bbe4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce90bbe4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce90bbe4 Branch: refs/heads/branch-1.6 Commit: ce90bbe427623c52d0c5380a979197be84218e45 Parents: 5e89f16 Author: Dilip Biswal <[email protected]> Authored: Thu Nov 26 11:31:28 2015 -0800 Committer: Michael Armbrust <[email protected]> Committed: Thu Nov 26 11:31:42 2015 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 ++++++++++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce90bbe4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 47962eb..94ffbbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -630,7 +630,8 @@ class Analyzer( // Try resolving the ordering as though it is in the aggregate clause. try { - val aliasedOrdering = sortOrder.map(o => Alias(o.child, "aggOrder")()) + val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s)) + val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = @@ -663,13 +664,19 @@ class Analyzer( } } + val sortOrdersMap = unresolvedSortOrders + .map(new TreeNodeRef(_)) + .zip(evaluatedOrderings) + .toMap + val finalSortOrders = sortOrder.map(s => sortOrdersMap.getOrElse(new TreeNodeRef(s), s)) + // Since we don't rely on sort.resolved as the stop condition for this rule, // we need to check this and prevent applying this rule multiple times - if (sortOrder == evaluatedOrderings) { + if (sortOrder == finalSortOrders) { sort } else { Project(aggregate.output, - Sort(evaluatedOrderings, global, + Sort(finalSortOrders, global, aggregate.copy(aggregateExpressions = originalAggExprs ++ needsPushDown))) } } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/ce90bbe4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index e051069..aeeca80 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -220,6 +220,24 @@ class AnalysisSuite extends AnalysisTest { // checkUDF(udf4, expected4) } + test("SPARK-11863 mixture of aliases and real columns in order by clause - tpcds 19,55,71") { + val a = testRelation2.output(0) + val c = testRelation2.output(2) + val alias1 = a.as("a1") + val alias2 = c.as("a2") + val alias3 = count(a).as("a3") + + val plan = testRelation2 + .groupBy('a, 'c)('a.as("a1"), 'c.as("a2"), count('a).as("a3")) + .orderBy('a1.asc, 'c.asc) + + val expected = testRelation2 + .groupBy(a, c)(alias1, alias2, alias3) + .orderBy(alias1.toAttribute.asc, alias2.toAttribute.asc) + .select(alias1.toAttribute, alias2.toAttribute, alias3.toAttribute) + checkAnalysis(plan, expected) + } + test("analyzer should replace current_timestamp with literals") { val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), Alias(CurrentTimestamp(), "b")()), LocalRelation()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
