Jarno Seppanen created SPARK-11405:
--------------------------------------

             Summary: ROW_NUMBER function does not adhere to window ORDER BY, 
when joining
                 Key: SPARK-11405
                 URL: https://issues.apache.org/jira/browse/SPARK-11405
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.5.0
         Environment: YARN
            Reporter: Jarno Seppanen


The following query produces incorrect results:
{code:sql}
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i;
+---+--------------------+-------+
|  i|                   x|row_num|
+---+--------------------+-------+
|  1|  0.8717439935587555|      1|
|  1|  0.6684483939068196|      2|
|  1|  0.3378351523586306|      3|
|  1|  0.2483285619632939|      4|
|  1|  0.4796752841655936|      5|
|  2|  0.2971739640384895|      1|
|  2|  0.2199359901600595|      2|
|  2|  0.4646004597998037|      3|
|  2| 0.24823688829578183|      4|
|  2|  0.5914212915574378|      5|
|  3|0.010912835935112164|      1|
|  3|  0.6520139509583123|      2|
|  3|  0.8571994559240592|      3|
|  3|  0.1122635843020473|      4|
|  3| 0.45913022936460457|      5|
+---+--------------------+-------+
{code}

The row number doesn't follow the correct order. The join seems to break the 
order, ROW_NUMBER() works correctly if the join results are saved to a 
temporary table, and a second query is made.

Here's a small PySpark test case to reproduce the error:

{code}

from pyspark.sql import Row
import random

a = sc.parallelize([Row(i=i, x=random.random())
                    for i in range(5)
                    for j in range(5)])

b = sc.parallelize([Row(i=i) for i in [1, 2, 3]])

af = sqlContext.createDataFrame(a)
bf = sqlContext.createDataFrame(b)
af.registerTempTable('a')
bf.registerTempTable('b')

af.show()
# +---+--------------------+
# |  i|                   x|
# +---+--------------------+
# |  0| 0.12978974167478896|
# |  0|  0.7105927498584452|
# |  0| 0.21225679077448045|
# |  0| 0.03849717391728036|
# |  0|  0.4976622146442401|
# |  1|  0.4796752841655936|
# |  1|  0.8717439935587555|
# |  1|  0.6684483939068196|
# |  1|  0.3378351523586306|
# |  1|  0.2483285619632939|
# |  2|  0.2971739640384895|
# |  2|  0.2199359901600595|
# |  2|  0.5914212915574378|
# |  2| 0.24823688829578183|
# |  2|  0.4646004597998037|
# |  3|  0.1122635843020473|
# |  3|  0.6520139509583123|
# |  3| 0.45913022936460457|
# |  3|0.010912835935112164|
# |  3|  0.8571994559240592|
# +---+--------------------+
# only showing top 20 rows

bf.show()
# +---+
# |  i|
# +---+
# |  1|
# |  2|
# |  3|
# +---+

### WRONG

sqlContext.sql("""
  SELECT a.i, a.x,
    ROW_NUMBER() OVER (
      PARTITION BY a.i ORDER BY a.x) AS row_num
  FROM a
  JOIN b ON b.i = a.i
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.8717439935587555|      1|
# |  1|  0.6684483939068196|      2|
# |  1|  0.3378351523586306|      3|
# |  1|  0.2483285619632939|      4|
# |  1|  0.4796752841655936|      5|
# |  2|  0.2971739640384895|      1|
# |  2|  0.2199359901600595|      2|
# |  2|  0.4646004597998037|      3|
# |  2| 0.24823688829578183|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.6520139509583123|      2|
# |  3|  0.8571994559240592|      3|
# |  3|  0.1122635843020473|      4|
# |  3| 0.45913022936460457|      5|
# +---+--------------------+-------+

### WORKAROUND BY USING TEMP TABLE

t = sqlContext.sql("""
  SELECT a.i, a.x
  FROM a
  JOIN b ON b.i = a.i
""").cache()
# trigger computation
t.head()
t.registerTempTable('t')

sqlContext.sql("""
  SELECT i, x,
    ROW_NUMBER() OVER (
      PARTITION BY i ORDER BY x) AS row_num
  FROM t
""").show()
# +---+--------------------+-------+
# |  i|                   x|row_num|
# +---+--------------------+-------+
# |  1|  0.2483285619632939|      1|
# |  1|  0.3378351523586306|      2|
# |  1|  0.4796752841655936|      3|
# |  1|  0.6684483939068196|      4|
# |  1|  0.8717439935587555|      5|
# |  2|  0.2199359901600595|      1|
# |  2| 0.24823688829578183|      2|
# |  2|  0.2971739640384895|      3|
# |  2|  0.4646004597998037|      4|
# |  2|  0.5914212915574378|      5|
# |  3|0.010912835935112164|      1|
# |  3|  0.1122635843020473|      2|
# |  3| 0.45913022936460457|      3|
# |  3|  0.6520139509583123|      4|
# |  3|  0.8571994559240592|      5|
# +---+--------------------+-------+
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to