Repository: spark
Updated Branches:
  refs/heads/branch-2.1 59502bbcf -> ba2a5ada4


[SPARK-18788][SPARKR] Add API for getNumPartitions

## What changes were proposed in this pull request?

With doc to say this would convert DF into RDD

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #16668 from felixcheung/rgetnumpartitions.

(cherry picked from commit 90817a6cd06068fa9f9ff77384a1fcba73b43006)
Signed-off-by: Felix Cheung <felixche...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba2a5ada
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba2a5ada
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba2a5ada

Branch: refs/heads/branch-2.1
Commit: ba2a5ada4825a9ca3e4e954a51574a2eede096a3
Parents: 59502bb
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Thu Jan 26 21:06:39 2017 -0800
Committer: Felix Cheung <felixche...@apache.org>
Committed: Thu Jan 26 21:06:54 2017 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/DataFrame.R                       | 23 ++++++++++++++++++++
 R/pkg/R/RDD.R                             | 30 +++++++++++++-------------
 R/pkg/R/generics.R                        |  8 +++++--
 R/pkg/R/pairRDD.R                         |  4 ++--
 R/pkg/inst/tests/testthat/test_rdd.R      | 10 ++++-----
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 ++++++------
 7 files changed, 59 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index c3ec3f4..8a19fd0 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -94,6 +94,7 @@ exportMethods("arrange",
               "freqItems",
               "gapply",
               "gapplyCollect",
+              "getNumPartitions",
               "group_by",
               "groupBy",
               "head",

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 48ac307..39e8376 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3422,3 +3422,26 @@ setMethod("randomSplit",
             }
             sapply(sdfs, dataFrame)
           })
+
+#' getNumPartitions
+#'
+#' Return the number of partitions
+#'
+#' @param x A SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases getNumPartitions,SparkDataFrame-method
+#' @rdname getNumPartitions
+#' @name getNumPartitions
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createDataFrame(cars, numPartitions = 2)
+#' getNumPartitions(df)
+#' }
+#' @note getNumPartitions since 2.1.1
+setMethod("getNumPartitions",
+          signature(x = "SparkDataFrame"),
+          function(x) {
+            callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 0f1162f..91bab33 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -313,7 +313,7 @@ setMethod("checkpoint",
 #' @rdname getNumPartitions
 #' @aliases getNumPartitions,RDD-method
 #' @noRd
-setMethod("getNumPartitions",
+setMethod("getNumPartitionsRDD",
           signature(x = "RDD"),
           function(x) {
             callJMethod(getJRDD(x), "getNumPartitions")
@@ -329,7 +329,7 @@ setMethod("numPartitions",
           signature(x = "RDD"),
           function(x) {
             .Deprecated("getNumPartitions")
-            getNumPartitions(x)
+            getNumPartitionsRDD(x)
           })
 
 #' Collect elements of an RDD
@@ -460,7 +460,7 @@ setMethod("countByValue",
           signature(x = "RDD"),
           function(x) {
             ones <- lapply(x, function(item) { list(item, 1L) })
-            collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
+            collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
           })
 
 #' Apply a function to all elements
@@ -780,7 +780,7 @@ setMethod("takeRDD",
             resList <- list()
             index <- -1
             jrdd <- getJRDD(x)
-            numPartitions <- getNumPartitions(x)
+            numPartitions <- getNumPartitionsRDD(x)
             serializedModeRDD <- getSerializedMode(x)
 
             # TODO(shivaram): Collect more than one partition based on size
@@ -846,7 +846,7 @@ setMethod("firstRDD",
 #' @noRd
 setMethod("distinctRDD",
           signature(x = "RDD"),
-          function(x, numPartitions = SparkR:::getNumPartitions(x)) {
+          function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
             identical.mapped <- lapply(x, function(x) { list(x, NULL) })
             reduced <- reduceByKey(identical.mapped,
                                    function(x, y) { x },
@@ -1053,7 +1053,7 @@ setMethod("coalesce",
            signature(x = "RDD", numPartitions = "numeric"),
            function(x, numPartitions, shuffle = FALSE) {
              numPartitions <- numToInt(numPartitions)
-             if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
+             if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
                func <- function(partIndex, part) {
                  set.seed(partIndex)  # partIndex as seed
                  start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
 #' @noRd
 setMethod("sortBy",
           signature(x = "RDD", func = "function"),
-          function(x, func, ascending = TRUE, numPartitions = 
SparkR:::getNumPartitions(x)) {
+          function(x, func, ascending = TRUE, numPartitions = 
SparkR:::getNumPartitionsRDD(x)) {
             values(sortByKey(keyBy(x, func), ascending, numPartitions))
           })
 
@@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
   resList <- list()
   index <- -1
   jrdd <- getJRDD(newRdd)
-  numPartitions <- getNumPartitions(newRdd)
+  numPartitions <- getNumPartitionsRDD(newRdd)
   serializedModeRDD <- getSerializedMode(newRdd)
 
   while (TRUE) {
@@ -1407,7 +1407,7 @@ setMethod("setName",
 setMethod("zipWithUniqueId",
           signature(x = "RDD"),
           function(x) {
-            n <- getNumPartitions(x)
+            n <- getNumPartitionsRDD(x)
 
             partitionFunc <- function(partIndex, part) {
               mapply(
@@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
 setMethod("zipWithIndex",
           signature(x = "RDD"),
           function(x) {
-            n <- getNumPartitions(x)
+            n <- getNumPartitionsRDD(x)
             if (n > 1) {
               nums <- collectRDD(lapplyPartition(x,
                                               function(part) {
@@ -1566,8 +1566,8 @@ setMethod("unionRDD",
 setMethod("zipRDD",
           signature(x = "RDD", other = "RDD"),
           function(x, other) {
-            n1 <- getNumPartitions(x)
-            n2 <- getNumPartitions(other)
+            n1 <- getNumPartitionsRDD(x)
+            n2 <- getNumPartitionsRDD(other)
             if (n1 != n2) {
               stop("Can only zip RDDs which have the same number of 
partitions.")
             }
@@ -1637,7 +1637,7 @@ setMethod("cartesian",
 #' @noRd
 setMethod("subtract",
           signature(x = "RDD", other = "RDD"),
-          function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+          function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
             mapFunction <- function(e) { list(e, NA) }
             rdd1 <- map(x, mapFunction)
             rdd2 <- map(other, mapFunction)
@@ -1671,7 +1671,7 @@ setMethod("subtract",
 #' @noRd
 setMethod("intersection",
           signature(x = "RDD", other = "RDD"),
-          function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+          function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
             rdd1 <- map(x, function(v) { list(v, NA) })
             rdd2 <- map(other, function(v) { list(v, NA) })
 
@@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
             if (length(rrdds) == 1) {
               return(rrdds[[1]])
             }
-            nPart <- sapply(rrdds, getNumPartitions)
+            nPart <- sapply(rrdds, getNumPartitionsRDD)
             if (length(unique(nPart)) != 1) {
               stop("Can only zipPartitions RDDs which have the same number of 
partitions.")
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 499c7b2..c6a324c 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { 
standardGeneric("sumRDD") })
 # @export
 setGeneric("name", function(x) { standardGeneric("name") })
 
-# @rdname getNumPartitions
+# @rdname getNumPartitionsRDD
 # @export
-setGeneric("getNumPartitions", function(x) { 
standardGeneric("getNumPartitions") })
+setGeneric("getNumPartitionsRDD", function(x) { 
standardGeneric("getNumPartitionsRDD") })
 
 # @rdname getNumPartitions
 # @export
@@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { 
standardGeneric("gapply") })
 #' @export
 setGeneric("gapplyCollect", function(x, ...) { 
standardGeneric("gapplyCollect") })
 
+# @rdname getNumPartitions
+# @export
+setGeneric("getNumPartitions", function(x) { 
standardGeneric("getNumPartitions") })
+
 #' @rdname summary
 #' @export
 setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 4dee324..8fa21be 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -780,7 +780,7 @@ setMethod("cogroup",
 #' @noRd
 setMethod("sortByKey",
           signature(x = "RDD"),
-          function(x, ascending = TRUE, numPartitions = 
SparkR:::getNumPartitions(x)) {
+          function(x, ascending = TRUE, numPartitions = 
SparkR:::getNumPartitionsRDD(x)) {
             rangeBounds <- list()
 
             if (numPartitions > 1) {
@@ -850,7 +850,7 @@ setMethod("sortByKey",
 #' @noRd
 setMethod("subtractByKey",
           signature(x = "RDD", other = "RDD"),
-          function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
+          function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
             filterFunction <- function(elem) {
               iters <- elem[[2]]
               (length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R 
b/R/pkg/inst/tests/testthat/test_rdd.R
index 2c41a6b..ceb31bd 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -29,8 +29,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), 
list(1L, 200))
 intRdd <- parallelize(sc, intPairs, 2L)
 
 test_that("get number of partitions in RDD", {
-  expect_equal(getNumPartitions(rdd), 2)
-  expect_equal(getNumPartitions(intRdd), 2)
+  expect_equal(getNumPartitionsRDD(rdd), 2)
+  expect_equal(getNumPartitionsRDD(intRdd), 2)
 })
 
 test_that("first on RDD", {
@@ -305,18 +305,18 @@ test_that("repartition/coalesce on RDDs", {
 
   # repartition
   r1 <- repartitionRDD(rdd, 2)
-  expect_equal(getNumPartitions(r1), 2L)
+  expect_equal(getNumPartitionsRDD(r1), 2L)
   count <- length(collectPartition(r1, 0L))
   expect_true(count >= 8 && count <= 12)
 
   r2 <- repartitionRDD(rdd, 6)
-  expect_equal(getNumPartitions(r2), 6L)
+  expect_equal(getNumPartitionsRDD(r2), 6L)
   count <- length(collectPartition(r2, 0L))
   expect_true(count >= 0 && count <= 4)
 
   # coalesce
   r3 <- coalesce(rdd, 1)
-  expect_equal(getNumPartitions(r3), 1L)
+  expect_equal(getNumPartitionsRDD(r3), 1L)
   count <- length(collectPartition(r3, 0L))
   expect_equal(count, 20)
 })

http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1f9daf5..2d0439e 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -196,18 +196,18 @@ test_that("create DataFrame from RDD", {
   expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), 
c("height", "float")))
   expect_equal(as.list(collect(where(df, df$name == "John"))),
                list(name = "John", age = 19L, height = 176.5))
-  expect_equal(getNumPartitions(toRDD(df)), 1)
+  expect_equal(getNumPartitions(df), 1)
 
   df <- as.DataFrame(cars, numPartitions = 2)
-  expect_equal(getNumPartitions(toRDD(df)), 2)
+  expect_equal(getNumPartitions(df), 2)
   df <- createDataFrame(cars, numPartitions = 3)
-  expect_equal(getNumPartitions(toRDD(df)), 3)
+  expect_equal(getNumPartitions(df), 3)
   # validate limit by num of rows
   df <- createDataFrame(cars, numPartitions = 60)
-  expect_equal(getNumPartitions(toRDD(df)), 50)
+  expect_equal(getNumPartitions(df), 50)
   # validate when 1 < (length(coll) / numSlices) << length(coll)
   df <- createDataFrame(cars, numPartitions = 20)
-  expect_equal(getNumPartitions(toRDD(df)), 20)
+  expect_equal(getNumPartitions(df), 20)
 
   df <- as.DataFrame(data.frame(0))
   expect_is(df, "SparkDataFrame")
@@ -215,7 +215,7 @@ test_that("create DataFrame from RDD", {
   expect_is(df, "SparkDataFrame")
   df <- as.DataFrame(data.frame(0), numPartitions = 2)
   # no data to partition, goes to 1
-  expect_equal(getNumPartitions(toRDD(df)), 1)
+  expect_equal(getNumPartitions(df), 1)
 
   setHiveContext(sc)
   sql("CREATE TABLE people (name string, age double, height float)")
@@ -234,7 +234,7 @@ test_that("createDataFrame uses files for large objects", {
   conf <- callJMethod(sparkSession, "conf")
   callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
   df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
-  expect_equal(getNumPartitions(toRDD(df)), 3)
+  expect_equal(getNumPartitions(df), 3)
 
   # Resetting the conf back to default value
   callJMethod(conf, "set", "spark.r.maxAllocationLimit", 
toString(.Machine$integer.max / 10))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to