[
https://issues.apache.org/jira/browse/SPARK-45881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818356#comment-17818356
]
Steven Aerts commented on SPARK-45881:
--------------------------------------
The pull request just got closed because of inactivity, while I think it is
still relevant.
Can someone take a look?
> Use Higher Order aggregate functions from SQL
> ---------------------------------------------
>
> Key: SPARK-45881
> URL: https://issues.apache.org/jira/browse/SPARK-45881
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.0
> Reporter: Steven Aerts
> Priority: Major
> Labels: pull-request-available
>
> Higher order aggregate funtions are aggregation function which take a lambda
> function as a parameter.
> An example for this from presto is the runction
> {{[reduce_agg|https://prestodb.io/docs/current/functions/aggregate.html#reduce_agg]}}
> which has the signature {{reduce_agg(inputValue T, initialState S,
> inputFunction(S, T, S), combineFunction(S, S, S))}} and it works like this:
> {code:java}
> SELECT id, reduce_agg(value, 0, (a, b) -> a + b, (a, b) -> a + b)
> FROM (VALUES (1, 2), (1, 3), (1, 4), (2, 20), (2, 30), (2, 40)) AS t(id,
> value)
> GROUP BY id;
> -- (1, 9)
> -- (2, 90)
> {code}
> In Spark you can today define, implement and use such a custom function from
> the scala API by implementing a case class which extends from
> {{TypedImperativeAggregate}} and add the {{HigherOrderFunction}} trait.
> However if you try to use this function from the sql api, you get:
> {code:java}
> org.apache.spark.sql.AnalysisException: A lambda function should only be used
> in a higher order function. However, its class is
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
> which is not a higher order function.; line 2 pos 2
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$153.$anonfun$applyOrElse$155(Analyzer.scala:2142)
> at scala.Option.map(Option.scala:230)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$153.$anonfun$applyOrElse$154(Analyzer.scala:2135)
> at
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$153.applyOrElse(Analyzer.scala:2143)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$153.applyOrElse(Analyzer.scala:2132)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
> {code}
> There is just a small thing missing in the {{Analyzer}} to get all of this
> working, we will provide a fix, unblocking higher order aggregate functions
> in spark sql.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]