Harry Weppner created SPARK-19032:
-------------------------------------

             Summary: Non-deterministic results using aggregation first across 
multiple workers
                 Key: SPARK-19032
                 URL: https://issues.apache.org/jira/browse/SPARK-19032
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 1.6.1
         Environment: Standalone Spark 1.6.1 cluster on EC2 with 2 worker 
nodes, one executor each.
            Reporter: Harry Weppner


We've come across a situation results aggregated using {{first}} on a sorted df 
are non-deterministic. Given the explanation for the plan there appears to be a 
plausible explanation but creates more question on the usefulness of these 
aggregation functions in a spark cluster.

Here's a minimal example to reproduce:

{code}
val df = 
sc.parallelize(Seq(("a","prod1",0.6),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4),("a","prod2",0.4))).toDF("account","product","probability")
var p = 
df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).show();

+-------+----------------+--------------------+
|account|first(product)()|first(probability)()|
+-------+----------------+--------------------+
|      a|           prod1|                 0.6|
+-------+----------------+--------------------+

p: Unit = ()

// Repeat and notice that result will occasionally be different

+-------+----------------+--------------------+
|account|first(product)()|first(probability)()|
+-------+----------------+--------------------+
|      a|           prod2|                 0.4|
+-------+----------------+--------------------+

p: Unit = ()

scala> 
df.sort($"probability".desc).groupBy($"account").agg(first($"product"),first($"probability")).explain(true);
== Parsed Logical Plan ==
'Aggregate ['account], 
[unresolvedalias('account),(first('product)(),mode=Complete,isDistinct=false) 
AS first(product)()#523,(first('probability)(),mode=Complete,isDistinct=false) 
AS first(probability)()#524]
+- Sort [probability#5 DESC], true
   +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
      +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
rddToDataFrameHolder at <console>:27

== Analyzed Logical Plan ==
account: string, first(product)(): string, first(probability)(): double
Aggregate [account#3], 
[account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS 
first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) AS 
first(probability)()#524]
+- Sort [probability#5 DESC], true
   +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
      +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
rddToDataFrameHolder at <console>:27

== Optimized Logical Plan ==
Aggregate [account#3], 
[account#3,(first(product#4)(),mode=Complete,isDistinct=false) AS 
first(product)()#523,(first(probability#5)(),mode=Complete,isDistinct=false) AS 
first(probability)()#524]
+- Sort [probability#5 DESC], true
   +- Project [_1#0 AS account#3,_2#1 AS product#4,_3#2 AS probability#5]
      +- LogicalRDD [_1#0,_2#1,_3#2], MapPartitionsRDD[1] at 
rddToDataFrameHolder at <console>:27

== Physical Plan ==
SortBasedAggregate(key=[account#3], 
functions=[(first(product#4)(),mode=Final,isDistinct=false),(first(probability#5)(),mode=Final,isDistinct=false)],
 output=[account#3,first(product)()#523,first(probability)()#524])
+- ConvertToSafe
   +- Sort [account#3 ASC], false, 0
      +- TungstenExchange hashpartitioning(account#3,200), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[account#3], 
functions=[(first(product#4)(),mode=Partial,isDistinct=false),(first(probability#5)(),mode=Partial,isDistinct=false)],
 output=[account#3,first#532,valueSet#533,first#534,valueSet#535])
               +- ConvertToSafe
                  +- Sort [account#3 ASC], false, 0
                     +- Sort [probability#5 DESC], true, 0
                        +- ConvertToUnsafe
                           +- Exchange rangepartitioning(probability#5 
DESC,200), None
                              +- ConvertToSafe
                                 +- Project [_1#0 AS account#3,_2#1 AS 
product#4,_3#2 AS probability#5]
                                    +- Scan ExistingRDD[_1#0,_2#1,_3#2]
{code}

My working hypothesis is that after {{TungstenExchange hashpartitioning}} the  
_global_ sort order on {{probability}} is lost leading to non-deterministic 
results.

If this hypothesis is valid, then how useful are aggregation functions such as 
{{first}}, {{last}} and possibly others in Spark?

It appears that the use of window functions could address the ambiguity by 
making the partitions explicit but I'd be interested in your assessment. Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to