Thanks for the suggestions. I suppose I should share a bit more about what
I tried/learned, so others who come later can understand why a
memory-efficient, exact median is not in Spark.

Spark's own ApproximatePercentile also uses QuantileSummaries internally
<https://github.com/apache/spark/blob/3f3201a7882b817a8a3ecbfeb369dde01e7689d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L225-L237>.
QuantileSummaries is a helper class for computing approximate quantiles
with a single pass over the data. I don't think I can use it to compute an
exact median.

Spark does already have code to compute an exact median: Percentile
<https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L74-L80>.
Since it works like other Catalyst expressions, it computes the median with
a single pass over the data. It does that by loading all the data into a
buffer and sorting it in memory
<https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L209>.
This is why the leading comment on Percentile warns that too much data will
cause GC pauses and OOMs
<https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
.

So I think this is what Reynold was getting at: With the design of Catalyst
expressions as they are today, there is no way to save memory by making
multiple passes over the data. So an approximate median is your best bet if
you want to avoid high memory usage.

You can build an exact median by doing other things, like multiple passes
over the data, or by using window functions, but that can't be captured in
a Catalyst Expression.

On Wed, Dec 15, 2021 at 11:00 AM Fitch, Simeon <fi...@astraea.io> wrote:

> Nicholas,
>
> This may or may not be much help, but in RasterFrames we have an
> approximate quantiles Expression computed against Tiles (2d geospatial
> arrays) which makes use of
> `org.apache.spark.sql.catalyst.util.QuantileSummaries` to do the hard work.
> So perhaps a directionally correct example of doing what you look to do?
>
>
> https://github.com/locationtech/rasterframes/blob/develop/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala
>
> In that same package are a number of other Aggregates, including
> declarative ones, which are another way of computing aggregations through
> composition of other Expressions.
>
> Simeon
>
>
>
>
>
> On Thu, Dec 9, 2021 at 9:26 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I'm trying to create a new aggregate function. It's my first time working
>> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>>
>> My goal is to create a function to calculate the median
>> <https://issues.apache.org/jira/browse/SPARK-26589>.
>>
>> As a very simple solution, I could just define median to be an alias of 
>> `Percentile(col,
>> 0.5)`. However, the leading comment on the Percentile expression
>> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
>> highlights that it's very memory-intensive and can easily lead to
>> OutOfMemory errors.
>>
>> So instead of using Percentile, I'm trying to create an Expression that
>> calculates the median without needing to hold everything in memory at once.
>> I'm considering two different approaches:
>>
>> 1. Define Median as a combination of existing expressions: The median
>> can perhaps be built out of the existing expressions for Count
>> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
>> and NthValue
>> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
>> .
>>
>> I don't see a template I can follow for building a new expression out of
>> existing expressions (i.e. without having to implement a bunch of methods
>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>> would wrap NthValue to make it usable as a regular aggregate function. The
>> wrapped NthValue would need an implicit window that provides the necessary
>> ordering.
>>
>>
>> Is there any potential to this idea? Any pointers on how to implement it?
>>
>>
>> 2. Another memory-light approach to calculating the median requires
>> multiple passes over the data to converge on the answer. The approach is 
>> described
>> here
>> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
>> (I posted a sketch implementation of this approach using Spark's user-level
>> API here
>> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
>> .)
>>
>> I am also struggling to understand how I would build an aggregate
>> function like this, since it requires multiple passes over the data. From
>> what I can see, Catalyst's aggregate functions are designed to work with a
>> single pass over the data.
>>
>> We don't seem to have an interface for AggregateFunction that supports
>> multiple passes over the data. Is there some way to do this?
>>
>>
>> Again, this is my first serious foray into Catalyst. Any specific
>> implementation guidance is appreciated!
>>
>> Nick
>>
>>
>
> --
> Simeon Fitch
> Co-founder & VP of R&D
> Astraea, Inc.
>
>

Reply via email to