Repository: spark
Updated Branches:
  refs/heads/master c776fce99 -> fb7e21797


[SPARK-13339][DOCS] Clarify commutative / associative operator requirements for 
reduce, fold

Clarify that reduce functions need to be commutative, and fold functions do not

See https://github.com/apache/spark/pull/11091

Author: Sean Owen <so...@cloudera.com>

Closes #11217 from srowen/SPARK-13339.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb7e2179
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb7e2179
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb7e2179

Branch: refs/heads/master
Commit: fb7e21797ed618d9754545a44f8f95f75b66757a
Parents: c776fce
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Feb 19 10:26:38 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Feb 19 10:26:38 2016 +0000

----------------------------------------------------------------------
 R/pkg/R/pairRDD.R                               | 10 +++---
 .../scala/org/apache/spark/Accumulator.scala    |  6 ++--
 .../org/apache/spark/api/java/JavaPairRDD.scala | 32 ++++++++++----------
 .../org/apache/spark/api/java/JavaRDDLike.scala |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 24 +++++++--------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 docs/programming-guide.md                       |  2 +-
 docs/streaming-programming-guide.md             |  4 +--
 python/pyspark/rdd.py                           |  7 ++---
 python/pyspark/streaming/dstream.py             |  4 +--
 .../streaming/api/java/JavaDStreamLike.scala    |  6 ++--
 .../streaming/api/java/JavaPairDStream.scala    | 18 +++++------
 .../spark/streaming/dstream/DStream.scala       |  4 +--
 .../dstream/PairDStreamFunctions.scala          | 16 +++++-----
 14 files changed, 68 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index f713114..4075ef4 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -305,11 +305,11 @@ setMethod("groupByKey",
 #'  Merge values by key
 #'
 #' This function operates on RDDs where every element is of the form list(K, 
V) or c(K, V).
-#' and merges the values for each key using an associative reduce function.
+#' and merges the values for each key using an associative and commutative 
reduce function.
 #'
 #' @param x The RDD to reduce by key. Should be an RDD where each element is
 #'             list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
+#' @param combineFunc The associative and commutative reduce function to use.
 #' @param numPartitions Number of partitions to create.
 #' @return An RDD where each element is list(K, V') where V' is the merged
 #'         value
@@ -347,12 +347,12 @@ setMethod("reduceByKey",
 #' Merge values by key locally
 #'
 #' This function operates on RDDs where every element is of the form list(K, 
V) or c(K, V).
-#' and merges the values for each key using an associative reduce function, 
but return the
-#' results immediately to the driver as an R list.
+#' and merges the values for each key using an associative and commutative 
reduce function, but
+#' return the results immediately to the driver as an R list.
 #'
 #' @param x The RDD to reduce by key. Should be an RDD where each element is
 #'             list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
+#' @param combineFunc The associative and commutative reduce function to use.
 #' @return A list of elements of type list(K, V') where V' is the merged value 
for each key
 #' @seealso reduceByKey
 #' @examples

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala 
b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 5e8f1d4..0e4bcc3 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -29,9 +29,9 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
 /**
  * A simpler value of [[Accumulable]] where the result type being accumulated 
is the same
  * as the types of elements being merged, i.e. variables that are only "added" 
to through an
- * associative operation and can therefore be efficiently supported in 
parallel. They can be used
- * to implement counters (as in MapReduce) or sums. Spark natively supports 
accumulators of numeric
- * value types, and programmers can add support for new types.
+ * associative and commutative operation and can therefore be efficiently 
supported in parallel.
+ * They can be used to implement counters (as in MapReduce) or sums. Spark 
natively supports
+ * accumulators of numeric value types, and programmers can add support for 
new types.
  *
  * An accumulator is created from an initial value `v` by calling 
[[SparkContext#accumulator]].
  * Tasks running on the cluster can then add to it using the 
[[Accumulable#+=]] operator.

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 94d1035..e080f91 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -278,17 +278,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     combineByKey(createCombiner, mergeValue, mergeCombiners, new 
HashPartitioner(numPartitions))
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce.
    */
   def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): 
JavaPairRDD[K, V] =
     fromRDD(rdd.reduceByKey(partitioner, func))
 
   /**
-   * Merge the values for each key using an associative reduce function, but 
return the results
-   * immediately to the master as a Map. This will also perform the merging 
locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in 
MapReduce.
+   * Merge the values for each key using an associative and commutative reduce 
function, but return
+   * the result immediately to the master as a Map. This will also perform the 
merging locally on
+   * each mapper before sending results to a reducer, similarly to a 
"combiner" in MapReduce.
    */
   def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
     mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
@@ -381,9 +381,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     fromRDD(rdd.foldByKey(zeroValue)(func))
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with 
numPartitions partitions.
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with 
numPartitions partitions.
    */
   def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): 
JavaPairRDD[K, V] =
     fromRDD(rdd.reduceByKey(func, numPartitions))
@@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     fromRDD(rdd.partitionBy(partitioner))
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce.
-   */
+    * Return an RDD containing all pairs of elements with matching keys in 
`this` and `other`. Each
+    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, 
v1) is in `this` and
+    * (k, v2) is in `other`. Uses the given Partitioner to partition the 
output RDD.
+    */
   def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): 
JavaPairRDD[K, (V, W)] =
     fromRDD(rdd.join(other, partitioner))
 
@@ -520,9 +520,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the 
existing partitioner/
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with the 
existing partitioner/
    * parallelism level.
    */
   def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 37c211f..4212027 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -373,7 +373,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
 
   /**
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using a
-   * given associative and commutative function and a neutral "zero value". 
The function
+   * given associative function and a neutral "zero value". The function
    * op(t1, t2) is allowed to modify t1 and return it as its result value to 
avoid object
    * allocation; however, it should not modify t2.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 61905a8..e00b9f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -300,27 +300,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce.
    */
   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = 
self.withScope {
     combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with 
numPartitions partitions.
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with 
numPartitions partitions.
    */
   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = 
self.withScope {
     reduceByKey(new HashPartitioner(numPartitions), func)
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This 
will also perform
-   * the merging locally on each mapper before sending results to a reducer, 
similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the 
existing partitioner/
+   * Merge the values for each key using an associative and commutative reduce 
function. This will
+   * also perform the merging locally on each mapper before sending results to 
a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with the 
existing partitioner/
    * parallelism level.
    */
   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
@@ -328,9 +328,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function, but 
return the results
-   * immediately to the master as a Map. This will also perform the merging 
locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in 
MapReduce.
+   * Merge the values for each key using an associative and commutative reduce 
function, but return
+   * the results immediately to the master as a Map. This will also perform 
the merging locally on
+   * each mapper before sending results to a reducer, similarly to a 
"combiner" in MapReduce.
    */
   def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
     val cleanedF = self.sparkContext.clean(func)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a81a98b..6a6ad2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -973,7 +973,7 @@ abstract class RDD[T: ClassTag](
 
   /**
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using a
-   * given associative and commutative function and a neutral "zero value". 
The function
+   * given associative function and a neutral "zero value". The function
    * op(t1, t2) is allowed to modify t1 and return it as its result value to 
avoid object
    * allocation; however, it should not modify t2.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index e450814..2d6f776 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1343,7 +1343,7 @@ value of the broadcast variable (e.g. if the variable is 
shipped to a new node l
 
 ## Accumulators
 
-Accumulators are variables that are only "added" to through an associative 
operation and can
+Accumulators are variables that are only "added" to through an associative and 
commutative operation and can
 therefore be efficiently supported in parallel. They can be used to implement 
counters (as in
 MapReduce) or sums. Spark natively supports accumulators of numeric types, and 
programmers
 can add support for new types. If accumulators are created with a name, they 
will be

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 677f5ff..4d1932b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -798,7 +798,7 @@ Some of the common ones are as follows.
   <td> <b>reduce</b>(<i>func</i>) </td>
   <td> Return a new DStream of single-element RDDs by aggregating the elements 
in each RDD of the
   source DStream using a function <i>func</i> (which takes two arguments and 
returns one).
-  The function should be associative so that it can be computed in parallel. 
</td>
+  The function should be associative and commutative so that it can be 
computed in parallel. </td>
 </tr>
 <tr>
   <td> <b>countByValue</b>() </td>
@@ -1072,7 +1072,7 @@ said two parameters - <i>windowLength</i> and 
<i>slideInterval</i>.
 <tr>
   <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, 
<i>slideInterval</i>) </td>
   <td> Return a new single-element stream, created by aggregating elements in 
the stream over a
-  sliding interval using <i>func</i>. The function should be associative so 
that it can be computed
+  sliding interval using <i>func</i>. The function should be associative and 
commutative so that it can be computed
   correctly in parallel.
   </td>
 </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fe2264a..4eaf589 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -844,8 +844,7 @@ class RDD(object):
     def fold(self, zeroValue, op):
         """
         Aggregate the elements of each partition, and then the results for all
-        the partitions, using a given associative and commutative function and
-        a neutral "zero value."
+        the partitions, using a given associative function and a neutral "zero 
value."
 
         The function C{op(t1, t2)} is allowed to modify C{t1} and return it
         as its result value to avoid object allocation; however, it should not
@@ -1558,7 +1557,7 @@ class RDD(object):
 
     def reduceByKey(self, func, numPartitions=None, 
partitionFunc=portable_hash):
         """
-        Merge the values for each key using an associative reduce function.
+        Merge the values for each key using an associative and commutative 
reduce function.
 
         This will also perform the merging locally on each mapper before
         sending results to a reducer, similarly to a "combiner" in MapReduce.
@@ -1576,7 +1575,7 @@ class RDD(object):
 
     def reduceByKeyLocally(self, func):
         """
-        Merge the values for each key using an associative reduce function, but
+        Merge the values for each key using an associative and commutative 
reduce function, but
         return the results immediately to the master as a dictionary.
 
         This will also perform the merging locally on each mapper before

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index 86447f5..2056663 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -453,7 +453,7 @@ class DStream(object):
         2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
         This is more efficient than `invReduceFunc` is None.
 
-        @param reduceFunc:     associative reduce function
+        @param reduceFunc:     associative and commutative reduce function
         @param invReduceFunc:  inverse reduce function of `reduceFunc`
         @param windowDuration: width of the window; must be a multiple of this 
DStream's
                                batching interval
@@ -524,7 +524,7 @@ class DStream(object):
         `invFunc` can be None, then it will reduce all the RDDs in window, 
could be slower
         than having `invFunc`.
 
-        @param func:           associative reduce function
+        @param func:           associative and commutative reduce function
         @param invFunc:        inverse function of `reduceFunc`
         @param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index f10de48..9931a46 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -214,7 +214,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
   /**
    * Return a new DStream in which each RDD has a single element generated by 
reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -234,7 +234,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
   /**
    * Return a new DStream in which each RDD has a single element generated by 
reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -257,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
    *  2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
    *  This is more efficient than reduceByWindow without "inverse reduce" 
function.
    *  However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index d718f1d..aad9a12 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -138,8 +138,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values 
for each key are
-   * merged using the associative reduce function. Hash partitioning is used 
to generate the RDDs
-   * with Spark's default number of partitions.
+   * merged using the associative and commutative reduce function. Hash 
partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
    */
   def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
     dstream.reduceByKey(func)
@@ -257,7 +257,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. 
The new DStream
    * generates RDDs with the same interval as this DStream. Hash partitioning 
is used to generate
    * the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    */
@@ -270,7 +270,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash 
partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -289,7 +289,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash 
partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -309,7 +309,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
Similar to
    * `DStream.reduceByKey()`, but applies it over a sliding window.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative rand commutative educe function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -335,7 +335,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" 
function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default 
number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
@@ -360,7 +360,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" 
function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with `numPartitions` 
partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
@@ -397,7 +397,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *  2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" 
function.
    * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index db79eea..70e1d8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -791,7 +791,7 @@ abstract class DStream[T: ClassTag] (
   /**
    * Return a new DStream in which each RDD has a single element generated by 
reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -814,7 +814,7 @@ abstract class DStream[T: ClassTag] (
    *  2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
    *  This is more efficient than reduceByWindow without "inverse reduce" 
function.
    *  However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index babc722..1dcdb64 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -75,8 +75,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values 
for each key are
-   * merged using the associative reduce function. Hash partitioning is used 
to generate the RDDs
-   * with Spark's default number of partitions.
+   * merged using the associative and commutative reduce function. Hash 
partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
    */
   def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
     reduceByKey(reduceFunc, defaultPartitioner())
@@ -204,7 +204,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. 
The new DStream
    * generates RDDs with the same interval as this DStream. Hash partitioning 
is used to generate
    * the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    */
@@ -219,7 +219,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash 
partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -238,7 +238,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash 
partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -259,7 +259,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
   /**
    * Return a new DStream by applying `reduceByKey` over a sliding window. 
Similar to
    * `DStream.reduceByKey()`, but applies it over a sliding window.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
@@ -289,7 +289,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * This is more efficient than reduceByKeyAndWindow without "inverse reduce" 
function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default 
number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval
@@ -320,7 +320,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    *  2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
    * This is more efficient than reduceByKeyAndWindow without "inverse reduce" 
function.
    * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc     associative reduce function
+   * @param reduceFunc     associative and commutative reduce function
    * @param invReduceFunc  inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this 
DStream's
    *                       batching interval


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to