Repository: spark
Updated Branches:
refs/heads/master 8c2edf46d -> cb77a6689
[SPARK-21291][R] add R partitionBy API in DataFrame
## What changes were proposed in this pull request?
add R partitionBy API in write.df
I didn't add bucketBy in write.df. The last line of write.df is
```
write <- handledCallJMethod(write, "save")
```
save doesn't support bucketBy right now.
```
assertNotBucketed("save")
```
## How was this patch tested?
Add unit test in test_sparkSQL.R
Closes #22537 from huaxingao/spark-21291.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb77a668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb77a668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb77a668
Branch: refs/heads/master
Commit: cb77a6689137916e64bc5692b0c942e86ca1a0ea
Parents: 8c2edf4
Author: Huaxin Gao <[email protected]>
Authored: Wed Sep 26 09:37:44 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Wed Sep 26 09:37:44 2018 +0800
----------------------------------------------------------------------
R/pkg/R/DataFrame.R | 17 +++++++++++++++--
R/pkg/tests/fulltests/test_sparkSQL.R | 8 ++++++++
2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a1cb478..3469188 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2954,6 +2954,9 @@ setMethod("exceptAll",
#' @param source a name for external data source.
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
+#' @param partitionBy a name or a list of names of columns to partition the
output by on the file
+#' system. If specified, the output is laid out on the file
system similar
+#' to Hive's partitioning scheme.
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
@@ -2965,13 +2968,13 @@ setMethod("exceptAll",
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
-#' write.df(df, "myfile", "parquet", "overwrite")
+#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1",
"col2"))
#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
#' }
#' @note write.df since 1.4.0
setMethod("write.df",
signature(df = "SparkDataFrame"),
- function(df, path = NULL, source = NULL, mode = "error", ...) {
+ function(df, path = NULL, source = NULL, mode = "error", partitionBy
= NULL, ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be character, NULL or omitted.")
}
@@ -2985,8 +2988,18 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
+ cols <- NULL
+ if (!is.null(partitionBy)) {
+ if (!all(sapply(partitionBy, function(c) is.character(c)))) {
+ stop("All partitionBy column names should be characters.")
+ }
+ cols <- as.list(partitionBy)
+ }
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
+ if (!is.null(cols)) {
+ write <- callJMethod(write, "partitionBy", cols)
+ }
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})
http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index a874bfb..50eff37 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2701,8 +2701,16 @@ test_that("read/write text files", {
expect_equal(colnames(df2), c("value"))
expect_equal(count(df2), count(df) * 2)
+ df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"),
list(2L, "2")),
+ schema = c("key", "value"))
+ textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
+ write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
+ df4 <- read.df(textPath3, "text")
+ expect_equal(count(df3), count(df4))
+
unlink(textPath)
unlink(textPath2)
+ unlink(textPath3)
})
test_that("read/write text files - compression option", {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]