Josh Rosen created SPARK-6416:
---------------------------------

             Summary: RDD.fold() requires the operator to be commutative
                 Key: SPARK-6416
                 URL: https://issues.apache.org/jira/browse/SPARK-6416
             Project: Spark
          Issue Type: Bug
          Components: Documentation, Spark Core
            Reporter: Josh Rosen
            Priority: Critical


Spark's {{RDD.fold}} operation has some confusing behaviors when a 
non-commutative reduce function is used.

Here's an example, which was originally reported on StackOverflow 
(https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):

{code}
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8
{code}

To understand what's going on here, let's look at the definition of Spark's 
`fold` operation.  

I'm going to show the Python version of the code, but the Scala version 
exhibits the exact same behavior (you can also [browse the source on 
GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]:

{code}
    def fold(self, zeroValue, op):
        """
        Aggregate the elements of each partition, and then the results for all
        the partitions, using a given associative function and a neutral "zero
        value."
        The function C{op(t1, t2)} is allowed to modify C{t1} and return it
        as its result value to avoid object allocation; however, it should not
        modify C{t2}.
        >>> from operator import add
        >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
        15
        """
        def func(iterator):
            acc = zeroValue
            for obj in iterator:
                acc = op(obj, acc)
            yield acc
        vals = self.mapPartitions(func).collect()
        return reduce(op, vals, zeroValue)
{code}

(For comparison, see the [Scala implementation of 
`RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]).

Spark's `fold` operates by first folding each partition and then folding the 
results.  The problem is that an empty partition gets folded down to the zero 
element, so the final driver-side fold ends up folding one value for _every_ 
partition rather than one value for each _non-empty_ partition.  This means 
that the result of `fold` is sensitive to the number of partitions:

{code}
    >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
    100
    >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
    50
    >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
    1
{code}

In this last case, what's happening is that the single partition is being 
folded down to the correct value, then that value is folded with the zero-value 
at the driver to yield 1.

I think the underlying problem here is that our fold() operation implicitly 
requires the operator to be commutative in addition to associative, but this 
isn't documented anywhere.  Due to ordering non-determinism elsewhere in Spark, 
such as SPARK-5750, I don't think there's an easy way to fix this.  Therefore, 
I think we should update the documentation and examples to clarify this 
requirement and explain that our fold acts more like a reduce with a default 
value than the type of ordering-sensitive fold() that users may expect in 
functional languages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to