[ 
https://issues.apache.org/jira/browse/SPARK-26589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452081#comment-17452081
 ] 

Nicholas Chammas commented on SPARK-26589:
------------------------------------------

[~srowen] - I'll ask for help on the dev list if appropriate, but I'm wondering 
if you can give me some high level guidance here.

I have an outline of an approach to calculate the median that does not require 
sorting or shuffling the data. It's based on the approach I linked to in my 
previous comment (by Michael Harris). It does require, however, multiple passes 
over the data for the algorithm to converge on the median.

Here's a working sketch of the approach:
{code:python}
def spark_median(data):
    total_count = data.count()
    if total_count % 2 == 0:
        target_positions = [total_count // 2, total_count // 2 + 1]
    else:
        target_positions = [total_count // 2 + 1]
    target_values = [
        kth_position(data, k, data_count=total_count)
        for k in target_positions
    ]
    return sum(target_values) / len(target_values)


def kth_position(data, k, data_count=None):
    if data_count is None:
        total_count = data.count()
    else:
        total_count = data_count
    if k > total_count or k < 1:
        return None
    while True:
        # This value, along with the following two counts, are the only data 
that need
        # to be shared across nodes.
        some_value = data.first()["id"]
        # These two counts can be performed together via an aggregator.
        larger_count = data.where(col("id") > some_value).count()
        equal_count = data.where(col("id") == some_value).count()
        value_positions = range(
            total_count - larger_count - equal_count + 1,
            total_count - larger_count + 1,
        )
        # print(some_value, total_count, k, value_positions)
        if k in value_positions:
            return some_value
        elif k >= value_positions.stop:
            k -= (value_positions.stop - 1)
            data = data.where(col("id") > some_value)
            total_count = larger_count
        elif k < value_positions.start:
            data = data.where(col("id") < some_value)
            total_count -= (larger_count + equal_count)
{code}
Of course, this needs to be converted into a Catalyst Expression, but the basic 
idea is expressed there.

I am looking at the definitions of 
[DeclarativeAggregate|https://github.com/apache/spark/blob/1dd0ca23f64acfc7a3dc697e19627a1b74012a2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L381-L394]
 and 
[ImperativeAggregate|https://github.com/apache/spark/blob/1dd0ca23f64acfc7a3dc697e19627a1b74012a2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L267-L285]
 and trying to find an existing expression to model after, but I don't think we 
have any existing aggregates that would work like this median—specifically, 
where multiple passes over the data are required (in this case, to count 
elements matching different filters).

Do you have any advice on how to approach converting this into a Catalyst 
expression?

There is an 
[NthValue|https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L648-L675]
 window expression, but I don't think I can build on it to make my median 
expression since a) median shouldn't be limited to window expressions, and b) 
NthValue requires a complete sort of the data, which I want to avoid.

> proper `median` method for spark dataframe
> ------------------------------------------
>
>                 Key: SPARK-26589
>                 URL: https://issues.apache.org/jira/browse/SPARK-26589
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Jan Gorecki
>            Priority: Minor
>
> I found multiple tickets asking for median function to be implemented in 
> Spark. Most of those tickets links to "SPARK-6761 Approximate quantile" as 
> duplicate of it. The thing is that approximate quantile is a workaround for 
> lack of median function. Thus I am filling this Feature Request for proper, 
> exact, not approximation of, median function. I am aware about difficulties 
> that are caused by distributed environment when trying to compute median, 
> nevertheless I don't think those difficulties is reason good enough to drop 
> out `median` function from scope of Spark. I am not asking about efficient 
> median but exact median.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to