Repository: spark Updated Branches: refs/heads/branch-1.6 70fcbf68e -> bd8efba8f
[SPARK-13087][SQL] Fix group by function for sort based aggregation It is not valid to call `toAttribute` on a `NamedExpression` unless we know for sure that the child produced that `NamedExpression`. The current code worked fine when the grouping expressions were simple, but when they were a derived value this blew up at execution time. Author: Michael Armbrust <[email protected]> Closes #11011 from marmbrus/groupByFunction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd8efba8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd8efba8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd8efba8 Branch: refs/heads/branch-1.6 Commit: bd8efba8f2131d951829020b4c68309a174859cf Parents: 70fcbf6 Author: Michael Armbrust <[email protected]> Authored: Tue Feb 2 16:51:07 2016 +0800 Committer: Yin Huai <[email protected]> Committed: Tue Feb 2 16:51:07 2016 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/aggregate/utils.scala | 5 ++--- .../spark/sql/hive/execution/AggregationQuerySuite.scala | 8 ++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bd8efba8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index 76b938c..751285a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -33,15 +33,14 @@ object Utils { resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { - val groupingAttributes = groupingExpressions.map(_.toAttribute) val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) val completeAggregateAttributes = completeAggregateExpressions.map { expr => aggregateFunctionToAttribute(expr.aggregateFunction, expr.isDistinct) } SortBasedAggregate( - requiredChildDistributionExpressions = Some(groupingAttributes), - groupingExpressions = groupingAttributes, + requiredChildDistributionExpressions = Some(groupingExpressions), + groupingExpressions = groupingExpressions, nonCompleteAggregateExpressions = Nil, nonCompleteAggregateAttributes = Nil, completeAggregateExpressions = completeAggregateExpressions, http://git-wip-us.apache.org/repos/asf/spark/blob/bd8efba8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 064c000..64bff82 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -193,6 +193,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te sqlContext.dropTempTable("emptyTable") } + test("group by function") { + Seq((1, 2)).toDF("a", "b").registerTempTable("data") + + checkAnswer( + sql("SELECT floor(a) AS a, collect_set(b) FROM data GROUP BY floor(a) ORDER BY a"), + Row(1, Array(2)) :: Nil) + } + test("empty table") { // If there is no GROUP BY clause and the table is empty, we will generate a single row. checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
