Repository: spark Updated Branches: refs/heads/master c776fce99 -> fb7e21797
[SPARK-13339][DOCS] Clarify commutative / associative operator requirements for reduce, fold Clarify that reduce functions need to be commutative, and fold functions do not See https://github.com/apache/spark/pull/11091 Author: Sean Owen <so...@cloudera.com> Closes #11217 from srowen/SPARK-13339. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb7e2179 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb7e2179 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb7e2179 Branch: refs/heads/master Commit: fb7e21797ed618d9754545a44f8f95f75b66757a Parents: c776fce Author: Sean Owen <so...@cloudera.com> Authored: Fri Feb 19 10:26:38 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Feb 19 10:26:38 2016 +0000 ---------------------------------------------------------------------- R/pkg/R/pairRDD.R | 10 +++--- .../scala/org/apache/spark/Accumulator.scala | 6 ++-- .../org/apache/spark/api/java/JavaPairRDD.scala | 32 ++++++++++---------- .../org/apache/spark/api/java/JavaRDDLike.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 24 +++++++-------- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- docs/programming-guide.md | 2 +- docs/streaming-programming-guide.md | 4 +-- python/pyspark/rdd.py | 7 ++--- python/pyspark/streaming/dstream.py | 4 +-- .../streaming/api/java/JavaDStreamLike.scala | 6 ++-- .../streaming/api/java/JavaPairDStream.scala | 18 +++++------ .../spark/streaming/dstream/DStream.scala | 4 +-- .../dstream/PairDStreamFunctions.scala | 16 +++++----- 14 files changed, 68 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index f713114..4075ef4 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -305,11 +305,11 @@ setMethod("groupByKey", #' Merge values by key #' #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' and merges the values for each key using an associative reduce function. +#' and merges the values for each key using an associative and commutative reduce function. #' #' @param x The RDD to reduce by key. Should be an RDD where each element is #' list(K, V) or c(K, V). -#' @param combineFunc The associative reduce function to use. +#' @param combineFunc The associative and commutative reduce function to use. #' @param numPartitions Number of partitions to create. #' @return An RDD where each element is list(K, V') where V' is the merged #' value @@ -347,12 +347,12 @@ setMethod("reduceByKey", #' Merge values by key locally #' #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' and merges the values for each key using an associative reduce function, but return the -#' results immediately to the driver as an R list. +#' and merges the values for each key using an associative and commutative reduce function, but +#' return the results immediately to the driver as an R list. #' #' @param x The RDD to reduce by key. Should be an RDD where each element is #' list(K, V) or c(K, V). -#' @param combineFunc The associative reduce function to use. +#' @param combineFunc The associative and commutative reduce function to use. #' @return A list of elements of type list(K, V') where V' is the merged value for each key #' @seealso reduceByKey #' @examples http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/Accumulator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 5e8f1d4..0e4bcc3 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -29,9 +29,9 @@ import org.apache.spark.storage.{BlockId, BlockStatus} /** * A simpler value of [[Accumulable]] where the result type being accumulated is the same * as the types of elements being merged, i.e. variables that are only "added" to through an - * associative operation and can therefore be efficiently supported in parallel. They can be used - * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric - * value types, and programmers can add support for new types. + * associative and commutative operation and can therefore be efficiently supported in parallel. + * They can be used to implement counters (as in MapReduce) or sums. Spark natively supports + * accumulators of numeric value types, and programmers can add support for new types. * * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 94d1035..e080f91 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -278,17 +278,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = fromRDD(rdd.reduceByKey(partitioner, func)) /** - * Merge the values for each key using an associative reduce function, but return the results - * immediately to the master as a Map. This will also perform the merging locally on each mapper - * before sending results to a reducer, similarly to a "combiner" in MapReduce. + * Merge the values for each key using an associative and commutative reduce function, but return + * the result immediately to the master as a Map. This will also perform the merging locally on + * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. */ def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) @@ -381,9 +381,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) fromRDD(rdd.foldByKey(zeroValue)(func)) /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.reduceByKey(func, numPartitions)) @@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) fromRDD(rdd.partitionBy(partitioner)) /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. - */ + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, partitioner)) @@ -520,9 +520,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 37c211f..4212027 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -373,7 +373,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Aggregate the elements of each partition, and then the results for all the partitions, using a - * given associative and commutative function and a neutral "zero value". The function + * given associative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 61905a8..e00b9f6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -300,27 +300,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) } /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ + * Merge the values for each key using an associative and commutative reduce function. This will + * also perform the merging locally on each mapper before sending results to a reducer, similarly + * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { @@ -328,9 +328,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative reduce function, but return the results - * immediately to the master as a Map. This will also perform the merging locally on each mapper - * before sending results to a reducer, similarly to a "combiner" in MapReduce. + * Merge the values for each key using an associative and commutative reduce function, but return + * the results immediately to the master as a Map. This will also perform the merging locally on + * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. */ def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope { val cleanedF = self.sparkContext.clean(func) http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a81a98b..6a6ad2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -973,7 +973,7 @@ abstract class RDD[T: ClassTag]( /** * Aggregate the elements of each partition, and then the results for all the partitions, using a - * given associative and commutative function and a neutral "zero value". The function + * given associative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index e450814..2d6f776 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1343,7 +1343,7 @@ value of the broadcast variable (e.g. if the variable is shipped to a new node l ## Accumulators -Accumulators are variables that are only "added" to through an associative operation and can +Accumulators are variables that are only "added" to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 677f5ff..4d1932b 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -798,7 +798,7 @@ Some of the common ones are as follows. <td> <b>reduce</b>(<i>func</i>) </td> <td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function <i>func</i> (which takes two arguments and returns one). - The function should be associative so that it can be computed in parallel. </td> + The function should be associative and commutative so that it can be computed in parallel. </td> </tr> <tr> <td> <b>countByValue</b>() </td> @@ -1072,7 +1072,7 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>. <tr> <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td> <td> Return a new single-element stream, created by aggregating elements in the stream over a - sliding interval using <i>func</i>. The function should be associative so that it can be computed + sliding interval using <i>func</i>. The function should be associative and commutative so that it can be computed correctly in parallel. </td> </tr> http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fe2264a..4eaf589 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -844,8 +844,7 @@ class RDD(object): def fold(self, zeroValue, op): """ Aggregate the elements of each partition, and then the results for all - the partitions, using a given associative and commutative function and - a neutral "zero value." + the partitions, using a given associative function and a neutral "zero value." The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value to avoid object allocation; however, it should not @@ -1558,7 +1557,7 @@ class RDD(object): def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): """ - Merge the values for each key using an associative reduce function. + Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. @@ -1576,7 +1575,7 @@ class RDD(object): def reduceByKeyLocally(self, func): """ - Merge the values for each key using an associative reduce function, but + Merge the values for each key using an associative and commutative reduce function, but return the results immediately to the master as a dictionary. This will also perform the merging locally on each mapper before http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 86447f5..2056663 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -453,7 +453,7 @@ class DStream(object): 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. - @param reduceFunc: associative reduce function + @param reduceFunc: associative and commutative reduce function @param invReduceFunc: inverse reduce function of `reduceFunc` @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @@ -524,7 +524,7 @@ class DStream(object): `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. - @param func: associative reduce function + @param func: associative and commutative reduce function @param invFunc: inverse function of `reduceFunc` @param windowDuration: width of the window; must be a multiple of this DStream's batching interval http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index f10de48..9931a46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -214,7 +214,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -234,7 +234,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -257,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index d718f1d..aad9a12 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -138,8 +138,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. + * merged using the associative and commutative reduce function. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) @@ -257,7 +257,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -270,7 +270,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -289,7 +289,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -309,7 +309,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function + * @param reduceFunc associative rand commutative educe function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -335,7 +335,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -360,7 +360,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -397,7 +397,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index db79eea..70e1d8a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -791,7 +791,7 @@ abstract class DStream[T: ClassTag] ( /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -814,7 +814,7 @@ abstract class DStream[T: ClassTag] ( * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index babc722..1dcdb64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -75,8 +75,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. + * merged using the associative and commutative reduce function. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) @@ -204,7 +204,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -219,7 +219,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -238,7 +238,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -259,7 +259,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) /** * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -289,7 +289,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -320,7 +320,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org