Repository: spark
Updated Branches:
  refs/heads/master 12854464c -> 527c780bb


Revert "[SPARK-13363][SQL] support Aggregator in RelationalGroupedDataset"

This reverts commit 12854464c4fa30c4df3b5b17bd8914d048dbf4a9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/527c780b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/527c780b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/527c780b

Branch: refs/heads/master
Commit: 527c780bb0d6cb074128448da00cb330e9049385
Parents: 1285446
Author: Reynold Xin <[email protected]>
Authored: Sat Apr 16 01:05:26 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Apr 16 01:05:26 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/RelationalGroupedDataset.scala   |  6 +-----
 .../org/apache/spark/sql/DatasetAggregatorSuite.scala | 14 +-------------
 2 files changed, 2 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/527c780b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index deb2e82..7dbf2e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -208,11 +208,7 @@ class RelationalGroupedDataset protected[sql](
    */
   @scala.annotation.varargs
   def agg(expr: Column, exprs: Column*): DataFrame = {
-    toDF((expr +: exprs).map {
-      case typed: TypedColumn[_, _] =>
-        typed.withInputType(df.resolvedTEncoder, df.logicalPlan.output).expr
-      case c => c.expr
-    })
+    toDF((expr +: exprs).map(_.expr))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/527c780b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 0d84a59..3a7215e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import scala.language.postfixOps
 
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.expressions.scala.typed
 import org.apache.spark.sql.functions._
@@ -84,15 +85,6 @@ class ParameterizedTypeSum[IN, OUT : Numeric : Encoder](f: 
IN => OUT)
   override def outputEncoder: Encoder[OUT] = implicitly[Encoder[OUT]]
 }
 
-object RowAgg extends Aggregator[Row, Int, Int] {
-  def zero: Int = 0
-  def reduce(b: Int, a: Row): Int = a.getInt(0) + b
-  def merge(b1: Int, b2: Int): Int = b1 + b2
-  def finish(r: Int): Int = r
-  override def bufferEncoder: Encoder[Int] = Encoders.scalaInt
-  override def outputEncoder: Encoder[Int] = Encoders.scalaInt
-}
-
 
 class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
 
@@ -208,8 +200,4 @@ class DatasetAggregatorSuite extends QueryTest with 
SharedSQLContext {
         (1279869254, "Some String"))
   }
 
-  test("aggregator in DataFrame/Dataset[Row]") {
-    val df = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
-    checkAnswer(df.groupBy($"j").agg(RowAgg.toColumn), Row("a", 1) :: Row("b", 
5) :: Nil)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to