[ 
https://issues.apache.org/jira/browse/SPARK-13363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200094#comment-15200094
 ] 

Reynold Xin commented on SPARK-13363:
-------------------------------------

[~maropu] if you have time to bring your patch up to date, that'd be great.


> Aggregator not working with DataFrame
> -------------------------------------
>
>                 Key: SPARK-13363
>                 URL: https://issues.apache.org/jira/browse/SPARK-13363
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: koert kuipers
>            Priority: Blocker
>
> org.apache.spark.sql.expressions.Aggregator doc/comments says: A base class 
> for user-defined aggregations, which can be used in [[DataFrame]] and 
> [[Dataset]]
> it works well with Dataset/GroupedDataset, but i am having no luck using it 
> with DataFrame/GroupedData. does anyone have an example how to use it with a 
> DataFrame?
> in particular i would like to use it with this method in GroupedData:
> {noformat}
>   def agg(expr: Column, exprs: Column*): DataFrame
> {noformat}
> clearly it should be possible, since GroupedDataset uses that very same 
> method to do the work:
> {noformat}
>   private def agg(exprs: Column*): DataFrame =
>     groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder): _*)
> {noformat}
> the trick seems to be the wrapping in withEncoder, which is private. i tried 
> to do something like it myself, but i had no luck since it uses more private 
> stuff in TypedColumn.
> anyhow, my attempt at using it in DataFrame:
> {noformat}
> val simpleSum = new SqlAggregator[Int, Int, Int] {
>   def zero: Int = 0                     // The initial value.
>   def reduce(b: Int, a: Int) = b + a    // Add an element to the running total
>   def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
>   def finish(b: Int) = b                // Return the final result.
> }.toColumn
> val df = sc.makeRDD(1 to 3).map(i => (i, i)).toDF("k", "v")
> df.groupBy("k").agg(simpleSum).show
> {noformat}
> and the resulting error:
> {noformat}
> org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate 
> [k#104], [k#104,($anon$3(),mode=Complete,isDistinct=false) AS sum#106];
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:46)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:241)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:122)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:46)
> at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:49)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to