Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dfa920204 -> 45c41aa33


[SPARK-16053][R] Add `spark_partition_id` in SparkR

## What changes were proposed in this pull request?

This PR adds `spark_partition_id` virtual column function in SparkR for API 
parity.

The following is just an example to illustrate a SparkR usage on a partitioned 
parquet table created by 
`spark.range(10).write.mode("overwrite").parquet("/tmp/t1")`.
```r
> collect(select(read.parquet('/tmp/t1'), c('id', spark_partition_id())))
   id SPARK_PARTITION_ID()
1   3                    0
2   4                    0
3   8                    1
4   9                    1
5   0                    2
6   1                    3
7   2                    4
8   5                    5
9   6                    6
10  7                    7
```

## How was this patch tested?

Pass the Jenkins tests (including new testcase).

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13768 from dongjoon-hyun/SPARK-16053.

(cherry picked from commit b0f2fb5b9729b38744bf784f2072f5ee52314f87)
Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45c41aa3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45c41aa3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45c41aa3

Branch: refs/heads/branch-2.0
Commit: 45c41aa33b39bfc38b8615fde044356a590edcfb
Parents: dfa9202
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Mon Jun 20 13:41:03 2016 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Mon Jun 20 13:41:11 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/functions.R                       | 21 +++++++++++++++++++++
 R/pkg/R/generics.R                        |  4 ++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  1 +
 4 files changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45c41aa3/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index aaeab66..45663f4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -260,6 +260,7 @@ exportMethods("%in%",
               "skewness",
               "sort_array",
               "soundex",
+              "spark_partition_id",
               "stddev",
               "stddev_pop",
               "stddev_samp",

http://git-wip-us.apache.org/repos/asf/spark/blob/45c41aa3/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 0fb38bc..c26f963 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -1206,6 +1206,27 @@ setMethod("soundex",
             column(jc)
           })
 
+#' Return the partition ID as a column
+#'
+#' Return the partition ID of the Spark task as a SparkDataFrame column.
+#' Note that this is nondeterministic because it depends on data partitioning 
and
+#' task scheduling.
+#'
+#' This is equivalent to the SPARK_PARTITION_ID function in SQL.
+#'
+#' @rdname spark_partition_id
+#' @name spark_partition_id
+#' @export
+#' @examples
+#' \dontrun{select(df, spark_partition_id())}
+#' @note spark_partition_id since 2.0.0
+setMethod("spark_partition_id",
+          signature(x = "missing"),
+          function() {
+            jc <- callJStatic("org.apache.spark.sql.functions", 
"spark_partition_id")
+            column(jc)
+          })
+
 #' @rdname sd
 #' @name stddev
 setMethod("stddev",

http://git-wip-us.apache.org/repos/asf/spark/blob/45c41aa3/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index dcc1cf2..f6b9276 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1135,6 +1135,10 @@ setGeneric("sort_array", function(x, asc = TRUE) { 
standardGeneric("sort_array")
 #' @export
 setGeneric("soundex", function(x) { standardGeneric("soundex") })
 
+#' @rdname spark_partition_id
+#' @export
+setGeneric("spark_partition_id", function(x) { 
standardGeneric("spark_partition_id") })
+
 #' @rdname sd
 #' @export
 setGeneric("stddev", function(x) { standardGeneric("stddev") })

http://git-wip-us.apache.org/repos/asf/spark/blob/45c41aa3/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 114fec6..d53c40d 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1059,6 +1059,7 @@ test_that("column functions", {
   c16 <- is.nan(c) + isnan(c) + isNaN(c)
   c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", 
"c1")
   c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
+  c19 <- spark_partition_id()
 
   # Test if base::is.nan() is exposed
   expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to