[ 
https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076704#comment-15076704
 ] 

Mark Hamstra commented on SPARK-6416:
-------------------------------------

I still don't see RDD#fold as being out of bounds with what should be expected 
from the Scala parallel collections model -- there, too, you can get confusing 
results if you don't pay attention to the partitioned nature of the operation:
{code}
scala> val list1 = (1 to 10000).toList

scala> val list2 = (1 to 100).toList

scala> list1.fold(0){ case (a, b) => a + 1 }
res0: Int = 10000

scala> list1.par.fold(0){ case (a, b) => a + 1 }
res1: Int = 162

scala> list2.fold(0){ case (a, b) => a + 1 }
res2: Int = 100

scala> list2.par.fold(0){ case (a, b) => a + 1 }
res3: Int = 7
{code}

> RDD.fold() requires the operator to be commutative
> --------------------------------------------------
>
>                 Key: SPARK-6416
>                 URL: https://issues.apache.org/jira/browse/SPARK-6416
>             Project: Spark
>          Issue Type: Bug
>          Components: Documentation, Spark Core
>            Reporter: Josh Rosen
>            Priority: Critical
>
> Spark's {{RDD.fold}} operation has some confusing behaviors when a 
> non-commutative reduce function is used.
> Here's an example, which was originally reported on StackOverflow 
> (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):
> {code}
> sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
> 8
> {code}
> To understand what's going on here, let's look at the definition of Spark's 
> `fold` operation.  
> I'm going to show the Python version of the code, but the Scala version 
> exhibits the exact same behavior (you can also [browse the source on 
> GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]:
> {code}
>     def fold(self, zeroValue, op):
>         """
>         Aggregate the elements of each partition, and then the results for all
>         the partitions, using a given associative function and a neutral "zero
>         value."
>         The function C{op(t1, t2)} is allowed to modify C{t1} and return it
>         as its result value to avoid object allocation; however, it should not
>         modify C{t2}.
>         >>> from operator import add
>         >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
>         15
>         """
>         def func(iterator):
>             acc = zeroValue
>             for obj in iterator:
>                 acc = op(obj, acc)
>             yield acc
>         vals = self.mapPartitions(func).collect()
>         return reduce(op, vals, zeroValue)
> {code}
> (For comparison, see the [Scala implementation of 
> `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]).
> Spark's `fold` operates by first folding each partition and then folding the 
> results.  The problem is that an empty partition gets folded down to the zero 
> element, so the final driver-side fold ends up folding one value for _every_ 
> partition rather than one value for each _non-empty_ partition.  This means 
> that the result of `fold` is sensitive to the number of partitions:
> {code}
>     >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
>     100
>     >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
>     50
>     >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
>     1
> {code}
> In this last case, what's happening is that the single partition is being 
> folded down to the correct value, then that value is folded with the 
> zero-value at the driver to yield 1.
> I think the underlying problem here is that our fold() operation implicitly 
> requires the operator to be commutative in addition to associative, but this 
> isn't documented anywhere.  Due to ordering non-determinism elsewhere in 
> Spark, such as SPARK-5750, I don't think there's an easy way to fix this.  
> Therefore, I think we should update the documentation and examples to clarify 
> this requirement and explain that our fold acts more like a reduce with a 
> default value than the type of ordering-sensitive fold() that users may 
> expect in functional languages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to