Repository: spark Updated Branches: refs/heads/master aee1420ec -> b0f2fb5b9
[SPARK-16053][R] Add `spark_partition_id` in SparkR ## What changes were proposed in this pull request? This PR adds `spark_partition_id` virtual column function in SparkR for API parity. The following is just an example to illustrate a SparkR usage on a partitioned parquet table created by `spark.range(10).write.mode("overwrite").parquet("/tmp/t1")`. ```r > collect(select(read.parquet('/tmp/t1'), c('id', spark_partition_id()))) id SPARK_PARTITION_ID() 1 3 0 2 4 0 3 8 1 4 9 1 5 0 2 6 1 3 7 2 4 8 5 5 9 6 6 10 7 7 ``` ## How was this patch tested? Pass the Jenkins tests (including new testcase). Author: Dongjoon Hyun <dongj...@apache.org> Closes #13768 from dongjoon-hyun/SPARK-16053. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f2fb5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f2fb5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f2fb5b Branch: refs/heads/master Commit: b0f2fb5b9729b38744bf784f2072f5ee52314f87 Parents: aee1420 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Mon Jun 20 13:41:03 2016 -0700 Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Committed: Mon Jun 20 13:41:03 2016 -0700 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 21 +++++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 1 + 4 files changed, 27 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b0f2fb5b/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index aaeab66..45663f4 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -260,6 +260,7 @@ exportMethods("%in%", "skewness", "sort_array", "soundex", + "spark_partition_id", "stddev", "stddev_pop", "stddev_samp", http://git-wip-us.apache.org/repos/asf/spark/blob/b0f2fb5b/R/pkg/R/functions.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 0fb38bc..c26f963 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1206,6 +1206,27 @@ setMethod("soundex", column(jc) }) +#' Return the partition ID as a column +#' +#' Return the partition ID of the Spark task as a SparkDataFrame column. +#' Note that this is nondeterministic because it depends on data partitioning and +#' task scheduling. +#' +#' This is equivalent to the SPARK_PARTITION_ID function in SQL. +#' +#' @rdname spark_partition_id +#' @name spark_partition_id +#' @export +#' @examples +#' \dontrun{select(df, spark_partition_id())} +#' @note spark_partition_id since 2.0.0 +setMethod("spark_partition_id", + signature(x = "missing"), + function() { + jc <- callJStatic("org.apache.spark.sql.functions", "spark_partition_id") + column(jc) + }) + #' @rdname sd #' @name stddev setMethod("stddev", http://git-wip-us.apache.org/repos/asf/spark/blob/b0f2fb5b/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index dcc1cf2..f6b9276 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1135,6 +1135,10 @@ setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array") #' @export setGeneric("soundex", function(x) { standardGeneric("soundex") }) +#' @rdname spark_partition_id +#' @export +setGeneric("spark_partition_id", function(x) { standardGeneric("spark_partition_id") }) + #' @rdname sd #' @export setGeneric("stddev", function(x) { standardGeneric("stddev") }) http://git-wip-us.apache.org/repos/asf/spark/blob/b0f2fb5b/R/pkg/inst/tests/testthat/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 114fec6..d53c40d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1059,6 +1059,7 @@ test_that("column functions", { c16 <- is.nan(c) + isnan(c) + isNaN(c) c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") c18 <- covar_pop(c, c1) + covar_pop("c", "c1") + c19 <- spark_partition_id() # Test if base::is.nan() is exposed expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org