Nicholas, This may or may not be much help, but in RasterFrames we have an approximate quantiles Expression computed against Tiles (2d geospatial arrays) which makes use of `org.apache.spark.sql.catalyst.util.QuantileSummaries` to do the hard work. So perhaps a directionally correct example of doing what you look to do?
https://github.com/locationtech/rasterframes/blob/develop/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala In that same package are a number of other Aggregates, including declarative ones, which are another way of computing aggregations through composition of other Expressions. Simeon On Thu, Dec 9, 2021 at 9:26 PM Nicholas Chammas <nicholas.cham...@gmail.com> wrote: > I'm trying to create a new aggregate function. It's my first time working > with Catalyst, so it's exciting---but I'm also in a bit over my head. > > My goal is to create a function to calculate the median > <https://issues.apache.org/jira/browse/SPARK-26589>. > > As a very simple solution, I could just define median to be an alias of > `Percentile(col, > 0.5)`. However, the leading comment on the Percentile expression > <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39> > highlights that it's very memory-intensive and can easily lead to > OutOfMemory errors. > > So instead of using Percentile, I'm trying to create an Expression that > calculates the median without needing to hold everything in memory at once. > I'm considering two different approaches: > > 1. Define Median as a combination of existing expressions: The median can > perhaps be built out of the existing expressions for Count > <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48> > and NthValue > <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675> > . > > I don't see a template I can follow for building a new expression out of > existing expressions (i.e. without having to implement a bunch of methods > for DeclarativeAggregate or ImperativeAggregate). I also don't know how I > would wrap NthValue to make it usable as a regular aggregate function. The > wrapped NthValue would need an implicit window that provides the necessary > ordering. > > > Is there any potential to this idea? Any pointers on how to implement it? > > > 2. Another memory-light approach to calculating the median requires > multiple passes over the data to converge on the answer. The approach is > described > here > <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>. > (I posted a sketch implementation of this approach using Spark's user-level > API here > <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081> > .) > > I am also struggling to understand how I would build an aggregate function > like this, since it requires multiple passes over the data. From what I can > see, Catalyst's aggregate functions are designed to work with a single pass > over the data. > > We don't seem to have an interface for AggregateFunction that supports > multiple passes over the data. Is there some way to do this? > > > Again, this is my first serious foray into Catalyst. Any specific > implementation guidance is appreciated! > > Nick > > -- Simeon Fitch Co-founder & VP of R&D Astraea, Inc.