This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 48d84d943 feat(r): support target catalog/schema for ingestion (#3852)
48d84d943 is described below
commit 48d84d9432900513b0c7132c4e32041174da2a81
Author: eitsupi <[email protected]>
AuthorDate: Mon Jan 5 10:48:40 2026 +0900
feat(r): support target catalog/schema for ingestion (#3852)
These experimental options were added to the C and Python bindings in
#1056, but were not exposed in the R bindings, so they are added now.
---
r/adbcdrivermanager/R/helpers.R | 12 +++++++++--
r/adbcdrivermanager/man/read_adbc.Rd | 8 +++++++-
r/adbcpostgresql/R/adbcpostgresql-package.R | 3 +++
r/adbcpostgresql/man/adbcpostgresql.Rd | 3 +++
.../tests/testthat/test-adbcpostgres-package.R | 24 ++++++++++++++++++++++
r/adbcsqlite/R/adbcsqlite-package.R | 3 +++
r/adbcsqlite/man/adbcsqlite.Rd | 3 +++
.../tests/testthat/test-adbcsqlite-package.R | 18 ++++++++++++++++
8 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/r/adbcdrivermanager/R/helpers.R b/r/adbcdrivermanager/R/helpers.R
index e9688a749..85336d318 100644
--- a/r/adbcdrivermanager/R/helpers.R
+++ b/r/adbcdrivermanager/R/helpers.R
@@ -34,6 +34,8 @@
#' @param bind A data.frame, nanoarrow_array, or nanoarrow_array_stream of
#' bind parameters or NULL to skip the bind/prepare step.
#' @param temporary Use TRUE to create a table as a temporary table.
+#' @param catalog_name If not `NULL`, the catalog to create/locate the table
in.
+#' @param db_schema_name If not `NULL`, the schema to create/locate the table
in.
#' @param ... Passed to S3 methods.
#'
#' @return
@@ -69,7 +71,9 @@ execute_adbc <- function(db_or_con, query, ..., bind = NULL) {
#' @export
write_adbc <- function(tbl, db_or_con, target_table, ...,
mode = c("default", "create", "append", "replace",
"create_append"),
- temporary = FALSE) {
+ temporary = FALSE,
+ catalog_name = NULL,
+ db_schema_name = NULL) {
UseMethod("write_adbc", db_or_con)
}
@@ -113,7 +117,9 @@ execute_adbc.default <- function(db_or_con, query, ...,
bind = NULL, stream = NU
#' @export
write_adbc.default <- function(tbl, db_or_con, target_table, ...,
mode = c("default", "create", "append",
"replace", "create_append"),
- temporary = FALSE) {
+ temporary = FALSE,
+ catalog_name = NULL,
+ db_schema_name = NULL) {
assert_adbc(db_or_con, c("adbc_database", "adbc_connection"))
mode <- match.arg(mode)
@@ -127,6 +133,8 @@ write_adbc.default <- function(tbl, db_or_con,
target_table, ...,
stmt <- adbc_statement_init(
con,
adbc.ingest.target_table = target_table,
+ adbc.ingest.target_catalog = catalog_name,
+ adbc.ingest.target_db_schema = db_schema_name,
adbc.ingest.mode = if (!identical(mode, "default"))
paste0("adbc.ingest.mode.", mode),
adbc.ingest.temporary = if (temporary) "true"
)
diff --git a/r/adbcdrivermanager/man/read_adbc.Rd
b/r/adbcdrivermanager/man/read_adbc.Rd
index cd1bbfa8a..d4c5c7fff 100644
--- a/r/adbcdrivermanager/man/read_adbc.Rd
+++ b/r/adbcdrivermanager/man/read_adbc.Rd
@@ -16,7 +16,9 @@ write_adbc(
target_table,
...,
mode = c("default", "create", "append", "replace", "create_append"),
- temporary = FALSE
+ temporary = FALSE,
+ catalog_name = NULL,
+ db_schema_name = NULL
)
}
\arguments{
@@ -41,6 +43,10 @@ is not compatible or append otherwise), or \code{"default"}
(use the \code{adbc.
argument of \code{\link[=adbc_statement_init]{adbc_statement_init()}}). The
default is \code{"default"}.}
\item{temporary}{Use TRUE to create a table as a temporary table.}
+
+\item{catalog_name}{If not \code{NULL}, the catalog to create/locate the table
in.}
+
+\item{db_schema_name}{If not \code{NULL}, the schema to create/locate the
table in.}
}
\value{
\itemize{
diff --git a/r/adbcpostgresql/R/adbcpostgresql-package.R
b/r/adbcpostgresql/R/adbcpostgresql-package.R
index 19c3e6d3a..7b610e3fb 100644
--- a/r/adbcpostgresql/R/adbcpostgresql-package.R
+++ b/r/adbcpostgresql/R/adbcpostgresql-package.R
@@ -34,6 +34,7 @@ NULL
#' @param adbc.connection.autocommit Use FALSE to disable the default
#' autocommit behaviour.
#' @param adbc.ingest.target_table The name of the target table for a bulk
insert.
+#' @param adbc.ingest.target_db_schema The schema of the table for a bulk
insert.
#' @param adbc.ingest.mode Whether to create (the default) or append.
#'
#' @return An [adbcdrivermanager::adbc_driver()]
@@ -78,10 +79,12 @@ adbc_connection_init.adbcpostgresql_database <-
function(database, ...,
#' @export
adbc_statement_init.adbcpostgresql_connection <- function(connection, ...,
adbc.ingest.target_table
= NULL,
+
adbc.ingest.target_db_schema = NULL,
adbc.ingest.mode = NULL)
{
options <- list(
...,
adbc.ingest.target_table = adbc.ingest.target_table,
+ adbc.ingest.target_db_schema = adbc.ingest.target_db_schema,
adbc.ingest.mode = adbc.ingest.mode
)
diff --git a/r/adbcpostgresql/man/adbcpostgresql.Rd
b/r/adbcpostgresql/man/adbcpostgresql.Rd
index 5ab83626f..2688023fa 100644
--- a/r/adbcpostgresql/man/adbcpostgresql.Rd
+++ b/r/adbcpostgresql/man/adbcpostgresql.Rd
@@ -17,6 +17,7 @@ adbcpostgresql()
connection,
...,
adbc.ingest.target_table = NULL,
+ adbc.ingest.target_db_schema = NULL,
adbc.ingest.mode = NULL
)
}
@@ -38,6 +39,8 @@ autocommit behaviour.}
\item{adbc.ingest.target_table}{The name of the target table for a bulk
insert.}
+\item{adbc.ingest.target_db_schema}{The schema of the table for a bulk insert.}
+
\item{adbc.ingest.mode}{Whether to create (the default) or append.}
}
\value{
diff --git a/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
b/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
index e787b8862..5816624c3 100644
--- a/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
+++ b/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
@@ -88,3 +88,27 @@ test_that("default options can open a database and execute a
query", {
adbcdrivermanager::adbc_statement_release(stmt)
})
+
+test_that("write_adbc() supports db_schema_name", {
+ test_db_uri <- Sys.getenv("ADBC_POSTGRESQL_TEST_URI", "")
+ skip_if(identical(test_db_uri, ""))
+
+ db <- adbc_database_init(adbcpostgresql(), uri = test_db_uri)
+ con <- adbc_connection_init(db)
+ on.exit({
+ adbcdrivermanager::adbc_connection_release(con)
+ adbcdrivermanager::adbc_database_release(db)
+ })
+
+ adbcdrivermanager::execute_adbc(con, "CREATE SCHEMA IF NOT EXISTS
testschema")
+ adbcdrivermanager::execute_adbc(con, "DROP TABLE IF EXISTS
testschema.df_schema")
+
+ df <- data.frame(x = as.double(1:3))
+ expect_identical(
+ adbcdrivermanager::write_adbc(df, con, "df_schema", db_schema_name =
"testschema", mode = "create"),
+ df
+ )
+
+ stream <- adbcdrivermanager::read_adbc(con, "SELECT * FROM
testschema.df_schema ORDER BY x")
+ expect_identical(as.data.frame(stream), df)
+})
diff --git a/r/adbcsqlite/R/adbcsqlite-package.R
b/r/adbcsqlite/R/adbcsqlite-package.R
index 8de0eb687..8a192dd65 100644
--- a/r/adbcsqlite/R/adbcsqlite-package.R
+++ b/r/adbcsqlite/R/adbcsqlite-package.R
@@ -33,6 +33,7 @@ NULL
#' @param adbc.connection.autocommit Use FALSE to disable the default
#' autocommit behaviour.
#' @param adbc.ingest.target_table The name of the target table for a bulk
insert.
+#' @param adbc.ingest.target_catalog The catalog of the table for a bulk
insert.
#' @param adbc.ingest.mode Whether to create (the default) or append.
#' @param adbc.sqlite.query.batch_rows The number of rows per batch to return.
#'
@@ -78,11 +79,13 @@ adbc_connection_init.adbcsqlite_database <-
function(database, ...,
#' @export
adbc_statement_init.adbcsqlite_connection <- function(connection, ...,
adbc.ingest.target_table
= NULL,
+
adbc.ingest.target_catalog = NULL,
adbc.ingest.mode = NULL,
adbc.sqlite.query.batch_rows = NULL) {
options <- list(
...,
adbc.ingest.target_table = adbc.ingest.target_table,
+ adbc.ingest.target_catalog = adbc.ingest.target_catalog,
adbc.ingest.mode = adbc.ingest.mode,
adbc.sqlite.query.batch_rows = adbc.sqlite.query.batch_rows
)
diff --git a/r/adbcsqlite/man/adbcsqlite.Rd b/r/adbcsqlite/man/adbcsqlite.Rd
index eea9d182d..4edf4c756 100644
--- a/r/adbcsqlite/man/adbcsqlite.Rd
+++ b/r/adbcsqlite/man/adbcsqlite.Rd
@@ -17,6 +17,7 @@ adbcsqlite()
connection,
...,
adbc.ingest.target_table = NULL,
+ adbc.ingest.target_catalog = NULL,
adbc.ingest.mode = NULL,
adbc.sqlite.query.batch_rows = NULL
)
@@ -38,6 +39,8 @@ autocommit behaviour.}
\item{adbc.ingest.target_table}{The name of the target table for a bulk
insert.}
+\item{adbc.ingest.target_catalog}{The catalog of the table for a bulk insert.}
+
\item{adbc.ingest.mode}{Whether to create (the default) or append.}
\item{adbc.sqlite.query.batch_rows}{The number of rows per batch to return.}
diff --git a/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
b/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
index 249a7aef6..d090655d1 100644
--- a/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
+++ b/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
@@ -99,6 +99,24 @@ test_that("read/write/execute SQL work with sqlite
connections", {
adbcdrivermanager::adbc_database_release(db)
})
+test_that("write_adbc() supports catalog_name", {
+ db <- adbc_database_init(adbcsqlite())
+ con <- adbc_connection_init(db)
+ on.exit({
+ adbcdrivermanager::adbc_connection_release(con)
+ adbcdrivermanager::adbc_database_release(db)
+ })
+
+ df <- data.frame(x = as.double(1:3))
+ expect_identical(
+ adbcdrivermanager::write_adbc(df, con, "df_catalog", catalog_name =
"main", mode = "replace"),
+ df
+ )
+
+ stream <- adbcdrivermanager::read_adbc(con, "SELECT * from main.df_catalog")
+ expect_identical(as.data.frame(stream), df)
+})
+
test_that("write_adbc() with temporary = TRUE works with sqlite databases", {
skip_if_not(packageVersion("adbcdrivermanager") >= "0.6.0.9000")