Repository: spark
Updated Branches:
  refs/heads/master 6e57d57b3 -> ebb77b2af


[SPARK-7033] [SPARKR] Clean usage of split. Use partition instead where 
applicable.

Author: Sun Rui <rui....@intel.com>

Closes #5628 from sun-rui/SPARK-7033 and squashes the following commits:

046bc9e [Sun Rui] Clean split usage in tests.
d531c86 [Sun Rui] [SPARK-7033][SPARKR] Clean usage of split. Use partition 
instead where applicable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebb77b2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebb77b2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebb77b2a

Branch: refs/heads/master
Commit: ebb77b2aff085e71906b5de9d266ded89051af82
Parents: 6e57d57
Author: Sun Rui <rui....@intel.com>
Authored: Fri Apr 24 11:00:19 2015 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Apr 24 11:00:19 2015 -0700

----------------------------------------------------------------------
 R/pkg/R/RDD.R               | 36 ++++++++++++++++++------------------
 R/pkg/R/context.R           | 20 ++++++++++----------
 R/pkg/R/pairRDD.R           |  8 ++++----
 R/pkg/R/utils.R             |  2 +-
 R/pkg/inst/tests/test_rdd.R | 12 ++++++------
 5 files changed, 39 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebb77b2a/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 1284313..cc09efb 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, 
prev, func, jrdd_val)
     # NOTE: We use prev_serializedMode to track the serialization mode of 
prev_JRDD
     # prev_serializedMode is used during the delayed computation of JRDD in 
getJRDD
   } else {
-    pipelinedFunc <- function(split, iterator) {
-      func(split, prev@func(split, iterator))
+    pipelinedFunc <- function(partIndex, part) {
+      func(partIndex, prev@func(partIndex, part))
     }
     .Object@func <- cleanClosure(pipelinedFunc)
     .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +306,7 @@ setMethod("numPartitions",
           signature(x = "RDD"),
           function(x) {
             jrdd <- getJRDD(x)
-            partitions <- callJMethod(jrdd, "splits")
+            partitions <- callJMethod(jrdd, "partitions")
             callJMethod(partitions, "size")
           })
 
@@ -452,8 +452,8 @@ setMethod("countByValue",
 setMethod("lapply",
           signature(X = "RDD", FUN = "function"),
           function(X, FUN) {
-            func <- function(split, iterator) {
-              lapply(iterator, FUN)
+            func <- function(partIndex, part) {
+              lapply(part, FUN)
             }
             lapplyPartitionsWithIndex(X, func)
           })
@@ -538,8 +538,8 @@ setMethod("mapPartitions",
 #'\dontrun{
 #' sc <- sparkR.init()
 #' rdd <- parallelize(sc, 1:10, 5L)
-#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
-#'                                          split * Reduce("+", part) })
+#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
+#'                                          partIndex * Reduce("+", part) })
 #' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
 #'}
 #' @rdname lapplyPartitionsWithIndex
@@ -813,7 +813,7 @@ setMethod("distinct",
 #' @examples
 #'\dontrun{
 #' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
+#' rdd <- parallelize(sc, 1:10)
 #' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
 #' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with 
duplicates
 #'}
@@ -825,14 +825,14 @@ setMethod("sampleRDD",
           function(x, withReplacement, fraction, seed) {
 
             # The sampler: takes a partition and returns its sampled version.
-            samplingFunc <- function(split, part) {
+            samplingFunc <- function(partIndex, part) {
               set.seed(seed)
               res <- vector("list", length(part))
               len <- 0
 
               # Discards some random values to ensure each partition has a
               # different random seed.
-              runif(split)
+              runif(partIndex)
 
               for (elem in part) {
                 if (withReplacement) {
@@ -989,8 +989,8 @@ setMethod("coalesce",
            function(x, numPartitions, shuffle = FALSE) {
              numPartitions <- numToInt(numPartitions)
              if (shuffle || numPartitions > SparkR::numPartitions(x)) {
-               func <- function(s, part) {
-                 set.seed(s)  # split as seed
+               func <- function(partIndex, part) {
+                 set.seed(partIndex)  # partIndex as seed
                  start <- as.integer(sample(numPartitions, 1) - 1)
                  lapply(seq_along(part),
                         function(i) {
@@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
 #' Save this RDD as a text file, using string representations of elements.
 #'
 #' @param x The RDD to save
-#' @param path The directory where the splits of the text file are saved
+#' @param path The directory where the partitions of the text file are saved
 #' @examples
 #'\dontrun{
 #' sc <- sparkR.init()
@@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
           function(x) {
             n <- numPartitions(x)
 
-            partitionFunc <- function(split, part) {
+            partitionFunc <- function(partIndex, part) {
               mapply(
                 function(item, index) {
-                  list(item, (index - 1) * n + split)
+                  list(item, (index - 1) * n + partIndex)
                 },
                 part,
                 seq_along(part),
@@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
               startIndices <- Reduce("+", nums, accumulate = TRUE)
             }
 
-            partitionFunc <- function(split, part) {
-              if (split == 0) {
+            partitionFunc <- function(partIndex, part) {
+              if (partIndex == 0) {
                 startIndex <- 0
               } else {
-                startIndex <- startIndices[[split]]
+                startIndex <- startIndices[[partIndex]]
               }
 
               mapply(

http://git-wip-us.apache.org/repos/asf/spark/blob/ebb77b2a/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index ebbb8fb..b4845b6 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -17,12 +17,12 @@
 
 # context.R: SparkContext driven functions
 
-getMinSplits <- function(sc, minSplits) {
-  if (is.null(minSplits)) {
+getMinPartitions <- function(sc, minPartitions) {
+  if (is.null(minPartitions)) {
     defaultParallelism <- callJMethod(sc, "defaultParallelism")
-    minSplits <- min(defaultParallelism, 2)
+    minPartitions <- min(defaultParallelism, 2)
   }
-  as.integer(minSplits)
+  as.integer(minPartitions)
 }
 
 #' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
 #'
 #' @param sc SparkContext to use
 #' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minSplits Minimum number of splits to be created. If NULL, the 
default
+#' @param minPartitions Minimum number of partitions to be created. If NULL, 
the default
 #'  value is chosen based on available parallelism.
 #' @return RDD where each item is of type \code{character}
 #' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
 #'  sc <- sparkR.init()
 #'  lines <- textFile(sc, "myfile.txt")
 #'}
-textFile <- function(sc, path, minSplits = NULL) {
+textFile <- function(sc, path, minPartitions = NULL) {
   # Allow the user to have a more flexible definiton of the text file path
   path <- suppressWarnings(normalizePath(path))
   #' Convert a string vector of paths to a string containing comma separated 
paths
   path <- paste(path, collapse = ",")
 
-  jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
+  jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, 
minPartitions))
   # jrdd is of type JavaRDD[String]
   RDD(jrdd, "string")
 }
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
 #'
 #' @param sc SparkContext to use
 #' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minSplits Minimum number of splits to be created. If NULL, the 
default
+#' @param minPartitions Minimum number of partitions to be created. If NULL, 
the default
 #'  value is chosen based on available parallelism.
 #' @return RDD containing serialized R objects.
 #' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
 #'  sc <- sparkR.init()
 #'  rdd <- objectFile(sc, "myfile")
 #'}
-objectFile <- function(sc, path, minSplits = NULL) {
+objectFile <- function(sc, path, minPartitions = NULL) {
   # Allow the user to have a more flexible definiton of the text file path
   path <- suppressWarnings(normalizePath(path))
   #' Convert a string vector of paths to a string containing comma separated 
paths
   path <- paste(path, collapse = ",")
 
-  jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
+  jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, 
minPartitions))
   # Assume the RDD contains serialized R objects.
   RDD(jrdd, "byte")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ebb77b2a/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 13efebc..f99b474 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -206,8 +206,8 @@ setMethod("partitionBy",
                                    get(name, .broadcastNames) })
             jrdd <- getJRDD(x)
 
-            # We create a PairwiseRRDD that extends RDD[(Array[Byte],
-            # Array[Byte])], where the key is the hashed split, the value is
+            # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
+            # where the key is the target partition number, the value is
             # the content (key-val pairs).
             pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
                                        callJMethod(jrdd, "rdd"),
@@ -866,8 +866,8 @@ setMethod("sampleByKey",
             }
 
             # The sampler: takes a partition and returns its sampled version.
-            samplingFunc <- function(split, part) {
-              set.seed(bitwXor(seed, split))
+            samplingFunc <- function(partIndex, part) {
+              set.seed(bitwXor(seed, partIndex))
               res <- vector("list", length(part))
               len <- 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ebb77b2a/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 23305d3..0e7b7bd 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
 #   A result RDD.
 mergePartitions <- function(rdd, zip) {
   serializerMode <- getSerializedMode(rdd)
-  partitionFunc <- function(split, part) {
+  partitionFunc <- function(partIndex, part) {
     len <- length(part)
     if (len > 0) {
       if (serializerMode == "byte") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ebb77b2a/R/pkg/inst/tests/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index 3ba7d17..d55af93 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on 
PipelinedRDD)", {
   rdd2 <- rdd
   for (i in 1:12)
     rdd2 <- lapplyPartitionsWithIndex(
-              rdd2, function(split, part) {
-                part <- as.list(unlist(part) * split + i)
+              rdd2, function(partIndex, part) {
+                part <- as.list(unlist(part) * partIndex + i)
               })
   rdd2 <- lapply(rdd2, function(x) x + x)
   actual <- collect(rdd2)
@@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), 
persist(), unpersist(), checkp
   # PipelinedRDD
   rdd2 <- lapplyPartitionsWithIndex(
             rdd2,
-            function(split, part) {
-              part <- as.list(unlist(part) * split)
+            function(partIndex, part) {
+              part <- as.list(unlist(part) * partIndex)
             })
 
   cache(rdd2)
@@ -174,13 +174,13 @@ test_that("lapply with dependency", {
 })
 
 test_that("lapplyPartitionsWithIndex on RDDs", {
-  func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
+  func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
   actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
   expect_equal(actual, list(list(0, 15), list(1, 40)))
 
   pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
   partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
-  mkTup <- function(splitIndex, part) { list(splitIndex, part) }
+  mkTup <- function(partIndex, part) { list(partIndex, part) }
   actual <- collect(lapplyPartitionsWithIndex(
                       partitionBy(pairsRDD, 2L, partitionByParity),
                       mkTup),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to