Repository: spark
Updated Branches:
  refs/heads/master b634901bb -> cc4d5229c


[SPARK-12625][SPARKR][SQL] replace R usage of Spark SQL deprecated API

rxin davies shivaram
Took save mode from my PR #10480, and move everything to writer methods. This 
is related to PR #10559

- [x] it seems jsonRDD() is broken, need to investigate - this is not a public 
API though; will look into some more tonight. (fixed)

Author: felixcheung <felixcheun...@hotmail.com>

Closes #10584 from felixcheung/rremovedeprecated.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc4d5229
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc4d5229
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc4d5229

Branch: refs/heads/master
Commit: cc4d5229c98a589da76a4d5e5fdc5ea92385183b
Parents: b634901
Author: felixcheung <felixcheun...@hotmail.com>
Authored: Mon Jan 4 22:32:07 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jan 4 22:32:07 2016 -0800

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                       | 33 +++++++++++++-------------
 R/pkg/R/SQLContext.R                      | 10 ++++----
 R/pkg/R/column.R                          |  2 +-
 R/pkg/R/utils.R                           |  9 +++++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  4 ++--
 dev/run-tests.py                          | 11 ++++-----
 6 files changed, 38 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 0cfa12b9..c126f9e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -458,7 +458,10 @@ setMethod("registerTempTable",
 setMethod("insertInto",
           signature(x = "DataFrame", tableName = "character"),
           function(x, tableName, overwrite = FALSE) {
-            callJMethod(x@sdf, "insertInto", tableName, overwrite)
+            jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", 
"append"))
+            write <- callJMethod(x@sdf, "write")
+            write <- callJMethod(write, "mode", jmode)
+            callJMethod(write, "insertInto", tableName)
           })
 
 #' Cache
@@ -1948,18 +1951,15 @@ setMethod("write.df",
               source <- callJMethod(sqlContext, "getConf", 
"spark.sql.sources.default",
                                     "org.apache.spark.sql.parquet")
             }
-            allModes <- c("append", "overwrite", "error", "ignore")
-            # nolint start
-            if (!(mode %in% allModes)) {
-              stop('mode should be one of "append", "overwrite", "error", 
"ignore"')
-            }
-            # nolint end
-            jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"saveMode", mode)
+            jmode <- convertToJSaveMode(mode)
             options <- varargsToEnv(...)
             if (!is.null(path)) {
                 options[["path"]] <- path
             }
-            callJMethod(df@sdf, "save", source, jmode, options)
+            write <- callJMethod(df@sdf, "write")
+            write <- callJMethod(write, "format", source)
+            write <- callJMethod(write, "mode", jmode)
+            write <- callJMethod(write, "save", path)
           })
 
 #' @rdname write.df
@@ -2013,15 +2013,14 @@ setMethod("saveAsTable",
               source <- callJMethod(sqlContext, "getConf", 
"spark.sql.sources.default",
                                     "org.apache.spark.sql.parquet")
             }
-            allModes <- c("append", "overwrite", "error", "ignore")
-            # nolint start
-            if (!(mode %in% allModes)) {
-              stop('mode should be one of "append", "overwrite", "error", 
"ignore"')
-            }
-            # nolint end
-            jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"saveMode", mode)
+            jmode <- convertToJSaveMode(mode)
             options <- varargsToEnv(...)
-            callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, 
options)
+
+            write <- callJMethod(df@sdf, "write")
+            write <- callJMethod(write, "format", source)
+            write <- callJMethod(write, "mode", jmode)
+            write <- callJMethod(write, "options", options)
+            callJMethod(write, "saveAsTable", tableName)
           })
 
 #' summary

http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 9243d70..ccc683d 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -256,9 +256,12 @@ jsonFile <- function(sqlContext, path) {
 
 # TODO: support schema
 jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
+  .Deprecated("read.json")
   rdd <- serializeToString(rdd)
   if (is.null(schema)) {
-    sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), 
"rdd"), samplingRatio)
+    read <- callJMethod(sqlContext, "read")
+    # samplingRatio is deprecated
+    sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
     dataFrame(sdf)
   } else {
     stop("not implemented")
@@ -289,10 +292,7 @@ read.parquet <- function(sqlContext, path) {
 # TODO: Implement saveasParquetFile and write examples for both
 parquetFile <- function(sqlContext, ...) {
   .Deprecated("read.parquet")
-  # Allow the user to have a more flexible definiton of the text file path
-  paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
-  sdf <- callJMethod(sqlContext, "parquetFile", paths)
-  dataFrame(sdf)
+  read.parquet(sqlContext, unlist(list(...)))
 }
 
 #' SQL Query

http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/R/pkg/R/column.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
index 356bcee..3ffd9a9 100644
--- a/R/pkg/R/column.R
+++ b/R/pkg/R/column.R
@@ -209,7 +209,7 @@ setMethod("cast",
 setMethod("%in%",
           signature(x = "Column"),
           function(x, table) {
-            jc <- callJMethod(x@jc, "in", as.list(table))
+            jc <- callJMethod(x@jc, "isin", as.list(table))
             return(column(jc))
           })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 43105aa..aa386e5 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -641,3 +641,12 @@ assignNewEnv <- function(data) {
 splitString <- function(input) {
   Filter(nzchar, unlist(strsplit(input, ",|\\s")))
 }
+
+convertToJSaveMode <- function(mode) {
+ allModes <- c("append", "overwrite", "error", "ignore")
+ if (!(mode %in% allModes)) {
+   stop('mode should be one of "append", "overwrite", "error", "ignore"')  # 
nolint
+ }
+ jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ jmode
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9e5d0eb..ebe8faa 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -423,12 +423,12 @@ test_that("read/write json files", {
 test_that("jsonRDD() on a RDD with json string", {
   rdd <- parallelize(sc, mockLines)
   expect_equal(count(rdd), 3)
-  df <- jsonRDD(sqlContext, rdd)
+  df <- suppressWarnings(jsonRDD(sqlContext, rdd))
   expect_is(df, "DataFrame")
   expect_equal(count(df), 3)
 
   rdd2 <- flatMap(rdd, function(x) c(x, x))
-  df <- jsonRDD(sqlContext, rdd2)
+  df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
   expect_is(df, "DataFrame")
   expect_equal(count(df), 6)
 })

http://git-wip-us.apache.org/repos/asf/spark/blob/cc4d5229/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index acc9450..8726889 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -425,13 +425,12 @@ def run_build_tests():
 
 
 def run_sparkr_tests():
-    # set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")
+    set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")
 
-    # if which("R"):
-    #     run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
-    # else:
-    #     print("Ignoring SparkR tests as R was not found in PATH")
-    pass
+    if which("R"):
+        run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
+    else:
+        print("Ignoring SparkR tests as R was not found in PATH")
 
 
 def parse_opts():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to