Repository: spark
Updated Branches:
refs/heads/master 0166c7373 -> 66b204646
[SPARK-25446][R] Add schema_of_json() and schema_of_csv() to R
## What changes were proposed in this pull request?
This PR proposes to expose `schema_of_json` and `schema_of_csv` at R side.
**`schema_of_json`**:
```r
json <- '{"name":"Bob"}'
df <- sql("SELECT * FROM range(1)")
head(select(df, schema_of_json(json)))
```
```
schema_of_json({"name":"Bob"})
1 struct<name:string>
```
**`schema_of_csv`**:
```r
csv <- "Amsterdam,2018"
df <- sql("SELECT * FROM range(1)")
head(select(df, schema_of_csv(csv)))
```
```
schema_of_csv(Amsterdam,2018)
1 struct<_c0:string,_c1:int>
```
## How was this patch tested?
Manually tested, unit tests added, documentation manually built and verified.
Closes #22939 from HyukjinKwon/SPARK-25446.
Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66b20464
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66b20464
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66b20464
Branch: refs/heads/master
Commit: 66b2046462c0e93b2ca167728eba9f4d13a5a67c
Parents: 0166c73
Author: hyukjinkwon <[email protected]>
Authored: Fri Nov 30 10:29:30 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Fri Nov 30 10:29:30 2018 +0800
----------------------------------------------------------------------
R/pkg/NAMESPACE | 2 +
R/pkg/R/functions.R | 77 +++++++++++++++++++++++++++---
R/pkg/R/generics.R | 8 ++++
R/pkg/tests/fulltests/test_sparkSQL.R | 16 ++++++-
4 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/66b20464/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cdeafdd..1f8ba0b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -351,6 +351,8 @@ exportMethods("%<=>%",
"row_number",
"rpad",
"rtrim",
+ "schema_of_csv",
+ "schema_of_json",
"second",
"sha1",
"sha2",
http://git-wip-us.apache.org/repos/asf/spark/blob/66b20464/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index f72645a..f568a93 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -205,11 +205,18 @@ NULL
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
-#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and
\code{from_json},
-#' this contains additional named properties to control how it is
converted, accepts
-#' the same options as the JSON/CSV data source. Additionally
\code{to_json} supports
-#' the "pretty" option which enables pretty JSON generation. In
\code{arrays_zip},
-#' this contains additional Columns of arrays to be merged.
+#' @param ... additional argument(s).
+#' \itemize{
+#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}:
this contains
+#' additional named properties to control how it is converted and
accepts the
+#' same options as the JSON data source.
+#' \item \code{to_json}: it supports the "pretty" option which
enables pretty
+#' JSON generation.
+#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}:
this contains
+#' additional named properties to control how it is converted and
accepts the
+#' same options as the CSV data source.
+#' \item \code{arrays_zip}, this contains additional Columns of
arrays to be merged.
+#' }
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
@@ -1771,12 +1778,16 @@ setMethod("to_date",
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
-#' df2 <- sql("SELECT map('name', 'Bob')) as people")
+#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as
people")
-#' df2 <- mutate(df2, people_json = to_json(df2$people))}
+#' df2 <- mutate(df2, people_json = to_json(df2$people))
+#'
+#' # Converts a map into a pretty JSON object
+#' df2 <- sql("SELECT map('name', 'Bob') as people")
+#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
@@ -2286,6 +2297,32 @@ setMethod("from_json", signature(x = "Column", schema =
"characterOrstructType")
})
#' @details
+#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL
format.
+#'
+#' @rdname column_collection_functions
+#' @aliases schema_of_json schema_of_json,characterOrColumn-method
+#' @examples
+#'
+#' \dontrun{
+#' json <- "{\"name\":\"Bob\"}"
+#' df <- sql("SELECT * FROM range(1)")
+#' head(select(df, schema_of_json(json)))}
+#' @note schema_of_json since 3.0.0
+setMethod("schema_of_json", signature(x = "characterOrColumn"),
+ function(x, ...) {
+ if (class(x) == "character") {
+ col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
+ } else {
+ col <- x@jc
+ }
+ options <- varargsToStrEnv(...)
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "schema_of_json",
+ col, options)
+ column(jc)
+ })
+
+#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of
\code{structType}
#' with the specified \code{schema}.
#' If the string is unparseable, the Column will contain the value NA.
@@ -2316,6 +2353,32 @@ setMethod("from_csv", signature(x = "Column", schema =
"characterOrColumn"),
})
#' @details
+#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL
format.
+#'
+#' @rdname column_collection_functions
+#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
+#' @examples
+#'
+#' \dontrun{
+#' csv <- "Amsterdam,2018"
+#' df <- sql("SELECT * FROM range(1)")
+#' head(select(df, schema_of_csv(csv)))}
+#' @note schema_of_csv since 3.0.0
+setMethod("schema_of_csv", signature(x = "characterOrColumn"),
+ function(x, ...) {
+ if (class(x) == "character") {
+ col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
+ } else {
+ col <- x@jc
+ }
+ options <- varargsToStrEnv(...)
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "schema_of_csv",
+ col, options)
+ column(jc)
+ })
+
+#' @details
#' \code{from_utc_timestamp}: This is a common function for databases
supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and
interprets it as a
#' timestamp in UTC, and renders that timestamp as a timestamp in the given
time zone.
http://git-wip-us.apache.org/repos/asf/spark/blob/66b20464/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index b2ca6e6..9d8c24c 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) {
standardGeneric("rpad") })
#' @name NULL
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })
+#' @rdname column_collection_functions
+#' @name NULL
+setGeneric("schema_of_csv", function(x, ...) {
standardGeneric("schema_of_csv") })
+
+#' @rdname column_collection_functions
+#' @name NULL
+setGeneric("schema_of_json", function(x, ...) {
standardGeneric("schema_of_json") })
+
#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })
http://git-wip-us.apache.org/repos/asf/spark/blob/66b20464/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 77a29c9..0d5118c 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1620,14 +1620,20 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
- # Test from_csv()
+ # Test from_csv(), schema_of_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
- # Test to_json(), from_json()
+ df <- as.DataFrame(list(list("col" = "1")))
+ c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
+ expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
+ c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
+ expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
+
+ # Test to_json(), from_json(), schema_of_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name',
'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1],
"[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
@@ -1654,6 +1660,12 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}
+ df <- as.DataFrame(list(list("col" = "1")))
+ c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
+ expect_equal(c[[1]], "struct<name:string>")
+ c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
+ expect_equal(c[[1]], "struct<name:string>")
+
# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]