This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c2f8f555dd7 [SPARK-39720][R] Implement tableExists/getTable in SparkR
for 3L namespace
c2f8f555dd7 is described below
commit c2f8f555dd7a067625455b66c207cbfd113a8e6e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jul 11 12:02:00 2022 +0800
[SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace
### What changes were proposed in this pull request?
1, Implement tableExists/getTable
2, Update the documents of
createTable/cacheTable/uncacheTable/refreshTable/recoverPartitions/listColumns
### Why are the changes needed?
for 3L namespace
### Does this PR introduce _any_ user-facing change?
yes, new method `tableExists`
### How was this patch tested?
updated UT
Closes #37133 from zhengruifeng/r_3L_tblname.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
R/pkg/NAMESPACE | 2 +
R/pkg/R/catalog.R | 87 +++++++++++++++++++++++++++++++++--
R/pkg/pkgdown/_pkgdown_template.yml | 2 +
R/pkg/tests/fulltests/test_sparkSQL.R | 46 ++++++++++++++----
4 files changed, 125 insertions(+), 12 deletions(-)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 570f721ab41..f5f60ecf134 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -478,6 +478,7 @@ export("as.DataFrame",
"currentDatabase",
"dropTempTable",
"dropTempView",
+ "getTable",
"listCatalogs",
"listColumns",
"listDatabases",
@@ -503,6 +504,7 @@ export("as.DataFrame",
"spark.getSparkFiles",
"sql",
"str",
+ "tableExists",
"tableToDF",
"tableNames",
"tables",
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index b10f73fb340..8237ac26b33 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL,
source = NULL, schema =
#'
#' @param tableName the qualified or unqualified name that designates a table.
If no database
#' identifier is provided, it refers to a table in the
current database.
+#' The table name can be fully qualified with catalog name
since 3.4.0.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data
sources.
@@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL,
source = NULL, schema =
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
-#' createTable("people", source = "json", schema = schema)
+#' createTable("spark_catalog.default.people", source = "json", schema =
schema)
#' insertInto(df, "people")
#' }
#' @name createTable
@@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source =
NULL, schema = NULL, ..
#'
#' @param tableName the qualified or unqualified name that designates a table.
If no database
#' identifier is provided, it refers to a table in the
current database.
+#' The table name can be fully qualified with catalog name
since 3.4.0.
#' @return SparkDataFrame
#' @rdname cacheTable
#' @examples
@@ -184,6 +186,7 @@ cacheTable <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table.
If no database
#' identifier is provided, it refers to a table in the
current database.
+#' The table name can be fully qualified with catalog name
since 3.4.0.
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @examples
@@ -403,6 +406,78 @@ listTables <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}
+#' Checks if the table with the specified name exists.
+#'
+#' Checks if the table with the specified name exists.
+#'
+#' @param tableName name of the table, allowed to be qualified with catalog
name
+#' @rdname tableExists
+#' @name tableExists
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' databaseExists("spark_catalog.default.myTable")
+#' }
+#' @note since 3.4.0
+tableExists <- function(tableName) {
+ sparkSession <- getSparkSession()
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "tableExists", tableName)
+}
+
+#' Get the table with the specified name
+#'
+#' Get the table with the specified name
+#'
+#' @param tableName the qualified or unqualified name that designates a table,
allowed to be
+#' qualified with catalog name
+#' @return A named list.
+#' @rdname getTable
+#' @name getTable
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' tbl <- getTable("spark_catalog.default.myTable")
+#' }
+#' @note since 3.4.0
+getTable <- function(tableName) {
+ sparkSession <- getSparkSession()
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jtbl <- handledCallJMethod(catalog, "getTable", tableName)
+
+ ret <- list(name = callJMethod(jtbl, "name"))
+ jcata <- callJMethod(jtbl, "catalog")
+ if (is.null(jcata)) {
+ ret$catalog <- NA
+ } else {
+ ret$catalog <- jcata
+ }
+
+ jns <- callJMethod(jtbl, "namespace")
+ if (is.null(jns)) {
+ ret$namespace <- NA
+ } else {
+ ret$namespace <- jns
+ }
+
+ jdesc <- callJMethod(jtbl, "description")
+ if (is.null(jdesc)) {
+ ret$description <- NA
+ } else {
+ ret$description <- jdesc
+ }
+
+ ret$tableType <- callJMethod(jtbl, "tableType")
+ ret$isTemporary <- callJMethod(jtbl, "isTemporary")
+ ret
+}
+
#' Returns a list of columns for the given table/view in the specified database
#'
#' Returns a list of columns for the given table/view in the specified
database.
@@ -410,6 +485,8 @@ listTables <- function(databaseName = NULL) {
#' @param tableName the qualified or unqualified name that designates a
table/view. If no database
#' identifier is provided, it refers to a table/view in the
current database.
#' If \code{databaseName} parameter is specified, this must
be an unqualified name.
+#' The table name can be qualified with catalog name since
3.4.0, when databaseName
+#' is NULL.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
@@ -417,7 +494,7 @@ listTables <- function(databaseName = NULL) {
#' @examples
#' \dontrun{
#' sparkR.session()
-#' listColumns("mytable")
+#' listColumns("spark_catalog.default.mytable")
#' }
#' @note since 2.2.0
listColumns <- function(tableName, databaseName = NULL) {
@@ -470,12 +547,13 @@ listFunctions <- function(databaseName = NULL) {
#'
#' @param tableName the qualified or unqualified name that designates a table.
If no database
#' identifier is provided, it refers to a table in the
current database.
+#' The table name can be fully qualified with catalog name
since 3.4.0.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @examples
#' \dontrun{
#' sparkR.session()
-#' recoverPartitions("myTable")
+#' recoverPartitions("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
recoverPartitions <- function(tableName) {
@@ -496,12 +574,13 @@ recoverPartitions <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table.
If no database
#' identifier is provided, it refers to a table in the
current database.
+#' The table name can be fully qualified with catalog name
since 3.4.0.
#' @rdname refreshTable
#' @name refreshTable
#' @examples
#' \dontrun{
#' sparkR.session()
-#' refreshTable("myTable")
+#' refreshTable("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
refreshTable <- function(tableName) {
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml
b/R/pkg/pkgdown/_pkgdown_template.yml
index d487b51ec5d..a9107c1293e 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -265,6 +265,7 @@ reference:
- currentDatabase
- dropTempTable
- dropTempView
+ - getTable
- listCatalogs
- listColumns
- listDatabases
@@ -275,6 +276,7 @@ reference:
- recoverPartitions
- setCurrentCatalog
- setCurrentDatabase
+ - tableExists
- tableNames
- tables
- uncacheTable
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 9586d8b45a5..29a6c2580e4 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -696,16 +696,27 @@ test_that(
expect_true(dropTempView("dfView"))
})
-test_that("test cache, uncache and clearCache", {
- df <- read.json(jsonPath)
- createOrReplaceTempView(df, "table1")
- cacheTable("table1")
- uncacheTable("table1")
+test_that("test tableExists, cache, uncache and clearCache", {
+ schema <- structType(structField("name", "string"), structField("age",
"integer"),
+ structField("height", "float"))
+ createTable("table1", source = "json", schema = schema)
+
+ cacheTable("default.table1")
+ uncacheTable("spark_catalog.default.table1")
clearCache()
- expect_true(dropTempView("table1"))
expect_error(uncacheTable("zxwtyswklpf"),
"Error in uncacheTable : analysis error - Table or view not found:
zxwtyswklpf")
+
+ expect_true(tableExists("table1"))
+ expect_true(tableExists("default.table1"))
+ expect_true(tableExists("spark_catalog.default.table1"))
+
+ sql("DROP TABLE IF EXISTS spark_catalog.default.table1")
+
+ expect_false(tableExists("table1"))
+ expect_false(tableExists("default.table1"))
+ expect_false(tableExists("spark_catalog.default.table1"))
})
test_that("insertInto() on a registered table", {
@@ -1342,7 +1353,7 @@ test_that("test HiveContext", {
schema <- structType(structField("name", "string"), structField("age",
"integer"),
structField("height", "float"))
- createTable("people", source = "json", schema = schema)
+ createTable("spark_catalog.default.people", source = "json", schema =
schema)
df <- read.df(jsonPathNa, "json", schema)
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name =
'Bob'"))$age, c(16))
@@ -4033,7 +4044,7 @@ test_that("catalog APIs, currentDatabase,
setCurrentDatabase, listDatabases", {
expect_equal(which(dbs[, 1] == "default"), 1)
})
-test_that("catalog APIs, listTables, listColumns, listFunctions", {
+test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", {
tb <- listTables()
count <- count(tables())
expect_equal(nrow(tb), count)
@@ -4075,7 +4086,26 @@ test_that("catalog APIs, listTables, listColumns,
listFunctions", {
expect_error(refreshTable("cars"), NA)
expect_error(refreshByPath("/"), NA)
+ view <- getTable("cars")
+ expect_equal(view$name, "cars")
+ expect_equal(view$tableType, "TEMPORARY")
+ expect_true(view$isTemporary)
+
dropTempView("cars")
+
+ schema <- structType(structField("name", "string"), structField("age",
"integer"),
+ structField("height", "float"))
+ createTable("default.people", source = "json", schema = schema)
+
+ tbl <- getTable("spark_catalog.default.people")
+ expect_equal(tbl$name, "people")
+ expect_equal(tbl$catalog, "spark_catalog")
+ expect_equal(length(tbl$namespace), 1)
+ expect_equal(tbl$namespace[[1]], "default")
+ expect_equal(tbl$tableType, "MANAGED")
+ expect_false(tbl$isTemporary)
+
+ sql("DROP TABLE IF EXISTS people")
})
test_that("assert_true, raise_error", {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]