Josh Rosen created SPARK-7965:
---------------------------------

             Summary: Wrong answers for queries with multiple window specs in 
the same expression
                 Key: SPARK-7965
                 URL: https://issues.apache.org/jira/browse/SPARK-7965
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.4.0
            Reporter: Josh Rosen


I think that Spark SQL may be returning incorrect answers for queries that use 
multiple window specifications within the same expression.  Here's an example 
that illustrates the problem.

Say that I have a table with a single numeric column and that I want to compute 
a cumulative distribution function over this column.  Let's call this table 
{{nums}}:

{code}
val nums = sc.parallelize(1 to 10).map(x => (x)).toDF("x")
nums.registerTempTable("nums")
{code}

It's easy to compute a running sum over this column:

{code}
sqlContext.sql("""
    select sum(x) over (rows between unbounded preceding and current row) from 
nums
""").collect()

nums: org.apache.spark.sql.DataFrame = [x: int]
res29: Array[org.apache.spark.sql.Row] = Array([1], [3], [6], [10], [15], [21], 
[28], [36], [45], [55])
{code}

It's also easy to compute a total sum over all rows:

{code}
sqlContext.sql("""
    select sum(x) over (rows between unbounded preceding and unbounded 
following) from nums
""").collect()

res34: Array[org.apache.spark.sql.Row] = Array([55], [55], [55], [55], [55], 
[55], [55], [55], [55], [55])
{code}

Let's say that I combine these expressions to compute a CDF:

{code}
sqlContext.sql("""
        select (sum(x) over (rows between unbounded preceding and current row))
    /
    (sum(x) over (rows between unbounded preceding and unbounded following)) 
from nums
""").collect()

res31: Array[org.apache.spark.sql.Row] = Array([1.0], [1.0], [1.0], [1.0], 
[1.0], [1.0], [1.0], [1.0], [1.0], [1.0])
{code}

This seems wrong.  Note that if we combine the running total, global total, and 
combined expression in the same query, then we see that the first two values 
are computed correctly / but the combined expression seems to be incorrect:

{code}
sqlContext.sql("""
    select
    sum(x) over (rows between unbounded preceding and current row) as 
running_sum,
    (sum(x) over (rows between unbounded preceding and unbounded following)) as 
total_sum,
    ((sum(x) over (rows between unbounded preceding and current row))
    /
    (sum(x) over (rows between unbounded preceding and unbounded following))) 
as combined
    from nums 
""").collect()

res40: Array[org.apache.spark.sql.Row] = Array([1,55,1.0], [3,55,1.0], 
[6,55,1.0], [10,55,1.0], [15,55,1.0], [21,55,1.0], [28,55,1.0], [36,55,1.0], 
[45,55,1.0], [55,55,1.0])
{code}

/cc [~yhuai]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to