Repository: spark Updated Branches: refs/heads/master 12854464c -> 527c780bb
Revert "[SPARK-13363][SQL] support Aggregator in RelationalGroupedDataset" This reverts commit 12854464c4fa30c4df3b5b17bd8914d048dbf4a9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/527c780b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/527c780b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/527c780b Branch: refs/heads/master Commit: 527c780bb0d6cb074128448da00cb330e9049385 Parents: 1285446 Author: Reynold Xin <[email protected]> Authored: Sat Apr 16 01:05:26 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Apr 16 01:05:26 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/RelationalGroupedDataset.scala | 6 +----- .../org/apache/spark/sql/DatasetAggregatorSuite.scala | 14 +------------- 2 files changed, 2 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/527c780b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index deb2e82..7dbf2e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -208,11 +208,7 @@ class RelationalGroupedDataset protected[sql]( */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = { - toDF((expr +: exprs).map { - case typed: TypedColumn[_, _] => - typed.withInputType(df.resolvedTEncoder, df.logicalPlan.output).expr - case c => c.expr - }) + toDF((expr +: exprs).map(_.expr)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/527c780b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 0d84a59..3a7215e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import scala.language.postfixOps +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scala.typed import org.apache.spark.sql.functions._ @@ -84,15 +85,6 @@ class ParameterizedTypeSum[IN, OUT : Numeric : Encoder](f: IN => OUT) override def outputEncoder: Encoder[OUT] = implicitly[Encoder[OUT]] } -object RowAgg extends Aggregator[Row, Int, Int] { - def zero: Int = 0 - def reduce(b: Int, a: Row): Int = a.getInt(0) + b - def merge(b1: Int, b2: Int): Int = b1 + b2 - def finish(r: Int): Int = r - override def bufferEncoder: Encoder[Int] = Encoders.scalaInt - override def outputEncoder: Encoder[Int] = Encoders.scalaInt -} - class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { @@ -208,8 +200,4 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { (1279869254, "Some String")) } - test("aggregator in DataFrame/Dataset[Row]") { - val df = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") - checkAnswer(df.groupBy($"j").agg(RowAgg.toColumn), Row("a", 1) :: Row("b", 5) :: Nil) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
