This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 14ddcfcc44f [SPARK-39716][R] Make
currentDatabase/setCurrentDatabase/listCatalogs in SparkR support 3L namespace
14ddcfcc44f is described below
commit 14ddcfcc44fa28b7473092a9567230022c4b01c7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Jul 8 18:06:53 2022 +0800
[SPARK-39716][R] Make currentDatabase/setCurrentDatabase/listCatalogs in
SparkR support 3L namespace
### What changes were proposed in this pull request?
add currentDatabase/setCurrentDatabase/listCatalogs
### Why are the changes needed?
to support 3L namespace in SparkR
### Does this PR introduce _any_ user-facing change?
yes, new API added
### How was this patch tested?
added UT
Closes #37127 from zhengruifeng/r_3L_catalog.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
R/pkg/NAMESPACE | 3 ++
R/pkg/R/catalog.R | 60 +++++++++++++++++++++++++++++++++++
R/pkg/pkgdown/_pkgdown_template.yml | 5 ++-
R/pkg/tests/fulltests/test_sparkSQL.R | 11 +++++++
4 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 6e0557cff88..570f721ab41 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -474,9 +474,11 @@ export("as.DataFrame",
"createDataFrame",
"createExternalTable",
"createTable",
+ "currentCatalog",
"currentDatabase",
"dropTempTable",
"dropTempView",
+ "listCatalogs",
"listColumns",
"listDatabases",
"listFunctions",
@@ -493,6 +495,7 @@ export("as.DataFrame",
"refreshByPath",
"refreshTable",
"setCheckpointDir",
+ "setCurrentCatalog",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index 275737f804b..b10f73fb340 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -17,6 +17,66 @@
# catalog.R: SparkSession catalog functions
+#' Returns the current default catalog
+#'
+#' Returns the current default catalog.
+#'
+#' @return name of the current default catalog.
+#' @rdname currentCatalog
+#' @name currentCatalog
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' currentCatalog()
+#' }
+#' @note since 3.4.0
+currentCatalog <- function() {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "currentCatalog")
+}
+
+#' Sets the current default catalog
+#'
+#' Sets the current default catalog.
+#'
+#' @param catalogName name of the catalog
+#' @rdname setCurrentCatalog
+#' @name setCurrentCatalog
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' setCurrentCatalog("spark_catalog")
+#' }
+#' @note since 3.4.0
+setCurrentCatalog <- function(catalogName) {
+ sparkSession <- getSparkSession()
+ if (class(catalogName) != "character") {
+ stop("catalogName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "setCurrentCatalog", catalogName))
+}
+
+#' Returns a list of catalog available
+#'
+#' Returns a list of catalog available.
+#'
+#' @return a SparkDataFrame of the list of catalog.
+#' @rdname listCatalogs
+#' @name listCatalogs
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listCatalogs()
+#' }
+#' @note since 3.4.0
+listCatalogs <- function() {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ dataFrame(callJMethod(callJMethod(catalog, "listCatalogs"), "toDF"))
+}
+
#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml
b/R/pkg/pkgdown/_pkgdown_template.yml
index eeb676befbc..d487b51ec5d 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -261,9 +261,11 @@ reference:
- title: "SQL Catalog"
- contents:
+ - currentCatalog
- currentDatabase
- dropTempTable
- dropTempView
+ - listCatalogs
- listColumns
- listDatabases
- listFunctions
@@ -271,6 +273,8 @@ reference:
- refreshByPath
- refreshTable
- recoverPartitions
+ - setCurrentCatalog
+ - setCurrentDatabase
- tableNames
- tables
- uncacheTable
@@ -283,7 +287,6 @@ reference:
- getLocalProperty
- install.spark
- setCheckpointDir
- - setCurrentDatabase
- setJobDescription
- setJobGroup
- setLocalProperty
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index b3218abb133..9586d8b45a5 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4011,6 +4011,17 @@ test_that("Collect on DataFrame when NAs exists at the
top of a timestamp column
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
})
+test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
+ expect_equal(currentCatalog(), "spark_catalog")
+ expect_error(setCurrentCatalog("spark_catalog"), NA)
+ expect_error(setCurrentCatalog("zxwtyswklpf"),
+ paste0("Error in setCurrentCatalog : ",
+
"org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ",
+ "Catalog 'zxwtyswklpf' plugin class not found: ",
+ "spark.sql.catalog.zxwtyswklpf is not defined"))
+ catalogs <- collect(listCatalogs())
+})
+
test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]