This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 48d84d943 feat(r): support target catalog/schema for ingestion (#3852)
48d84d943 is described below

commit 48d84d9432900513b0c7132c4e32041174da2a81
Author: eitsupi <[email protected]>
AuthorDate: Mon Jan 5 10:48:40 2026 +0900

    feat(r): support target catalog/schema for ingestion (#3852)
    
    These experimental options were added to the C and Python bindings in
    #1056, but were not exposed in the R bindings, so they are added now.
---
 r/adbcdrivermanager/R/helpers.R                    | 12 +++++++++--
 r/adbcdrivermanager/man/read_adbc.Rd               |  8 +++++++-
 r/adbcpostgresql/R/adbcpostgresql-package.R        |  3 +++
 r/adbcpostgresql/man/adbcpostgresql.Rd             |  3 +++
 .../tests/testthat/test-adbcpostgres-package.R     | 24 ++++++++++++++++++++++
 r/adbcsqlite/R/adbcsqlite-package.R                |  3 +++
 r/adbcsqlite/man/adbcsqlite.Rd                     |  3 +++
 .../tests/testthat/test-adbcsqlite-package.R       | 18 ++++++++++++++++
 8 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/r/adbcdrivermanager/R/helpers.R b/r/adbcdrivermanager/R/helpers.R
index e9688a749..85336d318 100644
--- a/r/adbcdrivermanager/R/helpers.R
+++ b/r/adbcdrivermanager/R/helpers.R
@@ -34,6 +34,8 @@
 #' @param bind A data.frame, nanoarrow_array, or nanoarrow_array_stream of
 #'   bind parameters or NULL to skip the bind/prepare step.
 #' @param temporary Use TRUE to create a table as a temporary table.
+#' @param catalog_name If not `NULL`, the catalog to create/locate the table 
in.
+#' @param db_schema_name If not `NULL`, the schema to create/locate the table 
in.
 #' @param ... Passed to S3 methods.
 #'
 #' @return
@@ -69,7 +71,9 @@ execute_adbc <- function(db_or_con, query, ..., bind = NULL) {
 #' @export
 write_adbc <- function(tbl, db_or_con, target_table, ...,
                        mode = c("default", "create", "append", "replace", 
"create_append"),
-                       temporary = FALSE) {
+                       temporary = FALSE,
+                       catalog_name = NULL,
+                       db_schema_name = NULL) {
   UseMethod("write_adbc", db_or_con)
 }
 
@@ -113,7 +117,9 @@ execute_adbc.default <- function(db_or_con, query, ..., 
bind = NULL, stream = NU
 #' @export
 write_adbc.default <- function(tbl, db_or_con, target_table, ...,
                                mode = c("default", "create", "append", 
"replace", "create_append"),
-                               temporary = FALSE) {
+                               temporary = FALSE,
+                               catalog_name = NULL,
+                               db_schema_name = NULL) {
   assert_adbc(db_or_con, c("adbc_database", "adbc_connection"))
   mode <- match.arg(mode)
 
@@ -127,6 +133,8 @@ write_adbc.default <- function(tbl, db_or_con, 
target_table, ...,
   stmt <- adbc_statement_init(
     con,
     adbc.ingest.target_table = target_table,
+    adbc.ingest.target_catalog = catalog_name,
+    adbc.ingest.target_db_schema = db_schema_name,
     adbc.ingest.mode = if (!identical(mode, "default")) 
paste0("adbc.ingest.mode.", mode),
     adbc.ingest.temporary = if (temporary) "true"
   )
diff --git a/r/adbcdrivermanager/man/read_adbc.Rd 
b/r/adbcdrivermanager/man/read_adbc.Rd
index cd1bbfa8a..d4c5c7fff 100644
--- a/r/adbcdrivermanager/man/read_adbc.Rd
+++ b/r/adbcdrivermanager/man/read_adbc.Rd
@@ -16,7 +16,9 @@ write_adbc(
   target_table,
   ...,
   mode = c("default", "create", "append", "replace", "create_append"),
-  temporary = FALSE
+  temporary = FALSE,
+  catalog_name = NULL,
+  db_schema_name = NULL
 )
 }
 \arguments{
@@ -41,6 +43,10 @@ is not compatible or append otherwise), or \code{"default"} 
(use the \code{adbc.
 argument of \code{\link[=adbc_statement_init]{adbc_statement_init()}}). The 
default is \code{"default"}.}
 
 \item{temporary}{Use TRUE to create a table as a temporary table.}
+
+\item{catalog_name}{If not \code{NULL}, the catalog to create/locate the table 
in.}
+
+\item{db_schema_name}{If not \code{NULL}, the schema to create/locate the 
table in.}
 }
 \value{
 \itemize{
diff --git a/r/adbcpostgresql/R/adbcpostgresql-package.R 
b/r/adbcpostgresql/R/adbcpostgresql-package.R
index 19c3e6d3a..7b610e3fb 100644
--- a/r/adbcpostgresql/R/adbcpostgresql-package.R
+++ b/r/adbcpostgresql/R/adbcpostgresql-package.R
@@ -34,6 +34,7 @@ NULL
 #' @param adbc.connection.autocommit Use FALSE to disable the default
 #'   autocommit behaviour.
 #' @param adbc.ingest.target_table The name of the target table for a bulk 
insert.
+#' @param adbc.ingest.target_db_schema The schema of the table for a bulk 
insert.
 #' @param adbc.ingest.mode Whether to create (the default) or append.
 #'
 #' @return An [adbcdrivermanager::adbc_driver()]
@@ -78,10 +79,12 @@ adbc_connection_init.adbcpostgresql_database <- 
function(database, ...,
 #' @export
 adbc_statement_init.adbcpostgresql_connection <- function(connection, ...,
                                                       adbc.ingest.target_table 
= NULL,
+                                                      
adbc.ingest.target_db_schema = NULL,
                                                       adbc.ingest.mode = NULL) 
{
   options <- list(
     ...,
     adbc.ingest.target_table = adbc.ingest.target_table,
+    adbc.ingest.target_db_schema = adbc.ingest.target_db_schema,
     adbc.ingest.mode = adbc.ingest.mode
   )
 
diff --git a/r/adbcpostgresql/man/adbcpostgresql.Rd 
b/r/adbcpostgresql/man/adbcpostgresql.Rd
index 5ab83626f..2688023fa 100644
--- a/r/adbcpostgresql/man/adbcpostgresql.Rd
+++ b/r/adbcpostgresql/man/adbcpostgresql.Rd
@@ -17,6 +17,7 @@ adbcpostgresql()
   connection,
   ...,
   adbc.ingest.target_table = NULL,
+  adbc.ingest.target_db_schema = NULL,
   adbc.ingest.mode = NULL
 )
 }
@@ -38,6 +39,8 @@ autocommit behaviour.}
 
 \item{adbc.ingest.target_table}{The name of the target table for a bulk 
insert.}
 
+\item{adbc.ingest.target_db_schema}{The schema of the table for a bulk insert.}
+
 \item{adbc.ingest.mode}{Whether to create (the default) or append.}
 }
 \value{
diff --git a/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R 
b/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
index e787b8862..5816624c3 100644
--- a/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
+++ b/r/adbcpostgresql/tests/testthat/test-adbcpostgres-package.R
@@ -88,3 +88,27 @@ test_that("default options can open a database and execute a 
query", {
 
   adbcdrivermanager::adbc_statement_release(stmt)
 })
+
+test_that("write_adbc() supports db_schema_name", {
+  test_db_uri <- Sys.getenv("ADBC_POSTGRESQL_TEST_URI", "")
+  skip_if(identical(test_db_uri, ""))
+
+  db <- adbc_database_init(adbcpostgresql(), uri = test_db_uri)
+  con <- adbc_connection_init(db)
+  on.exit({
+    adbcdrivermanager::adbc_connection_release(con)
+    adbcdrivermanager::adbc_database_release(db)
+  })
+
+  adbcdrivermanager::execute_adbc(con, "CREATE SCHEMA IF NOT EXISTS 
testschema")
+  adbcdrivermanager::execute_adbc(con, "DROP TABLE IF EXISTS 
testschema.df_schema")
+
+  df <- data.frame(x = as.double(1:3))
+  expect_identical(
+    adbcdrivermanager::write_adbc(df, con, "df_schema", db_schema_name = 
"testschema", mode = "create"),
+    df
+  )
+
+  stream <- adbcdrivermanager::read_adbc(con, "SELECT * FROM 
testschema.df_schema ORDER BY x")
+  expect_identical(as.data.frame(stream), df)
+})
diff --git a/r/adbcsqlite/R/adbcsqlite-package.R 
b/r/adbcsqlite/R/adbcsqlite-package.R
index 8de0eb687..8a192dd65 100644
--- a/r/adbcsqlite/R/adbcsqlite-package.R
+++ b/r/adbcsqlite/R/adbcsqlite-package.R
@@ -33,6 +33,7 @@ NULL
 #' @param adbc.connection.autocommit Use FALSE to disable the default
 #'   autocommit behaviour.
 #' @param adbc.ingest.target_table The name of the target table for a bulk 
insert.
+#' @param adbc.ingest.target_catalog The catalog of the table for a bulk 
insert.
 #' @param adbc.ingest.mode Whether to create (the default) or append.
 #' @param adbc.sqlite.query.batch_rows The number of rows per batch to return.
 #'
@@ -78,11 +79,13 @@ adbc_connection_init.adbcsqlite_database <- 
function(database, ...,
 #' @export
 adbc_statement_init.adbcsqlite_connection <- function(connection, ...,
                                                       adbc.ingest.target_table 
= NULL,
+                                                      
adbc.ingest.target_catalog = NULL,
                                                       adbc.ingest.mode = NULL,
                                                       
adbc.sqlite.query.batch_rows = NULL) {
   options <- list(
     ...,
     adbc.ingest.target_table = adbc.ingest.target_table,
+    adbc.ingest.target_catalog = adbc.ingest.target_catalog,
     adbc.ingest.mode = adbc.ingest.mode,
     adbc.sqlite.query.batch_rows = adbc.sqlite.query.batch_rows
   )
diff --git a/r/adbcsqlite/man/adbcsqlite.Rd b/r/adbcsqlite/man/adbcsqlite.Rd
index eea9d182d..4edf4c756 100644
--- a/r/adbcsqlite/man/adbcsqlite.Rd
+++ b/r/adbcsqlite/man/adbcsqlite.Rd
@@ -17,6 +17,7 @@ adbcsqlite()
   connection,
   ...,
   adbc.ingest.target_table = NULL,
+  adbc.ingest.target_catalog = NULL,
   adbc.ingest.mode = NULL,
   adbc.sqlite.query.batch_rows = NULL
 )
@@ -38,6 +39,8 @@ autocommit behaviour.}
 
 \item{adbc.ingest.target_table}{The name of the target table for a bulk 
insert.}
 
+\item{adbc.ingest.target_catalog}{The catalog of the table for a bulk insert.}
+
 \item{adbc.ingest.mode}{Whether to create (the default) or append.}
 
 \item{adbc.sqlite.query.batch_rows}{The number of rows per batch to return.}
diff --git a/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R 
b/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
index 249a7aef6..d090655d1 100644
--- a/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
+++ b/r/adbcsqlite/tests/testthat/test-adbcsqlite-package.R
@@ -99,6 +99,24 @@ test_that("read/write/execute SQL work with sqlite 
connections", {
   adbcdrivermanager::adbc_database_release(db)
 })
 
+test_that("write_adbc() supports catalog_name", {
+  db <- adbc_database_init(adbcsqlite())
+  con <- adbc_connection_init(db)
+  on.exit({
+    adbcdrivermanager::adbc_connection_release(con)
+    adbcdrivermanager::adbc_database_release(db)
+  })
+
+  df <- data.frame(x = as.double(1:3))
+  expect_identical(
+    adbcdrivermanager::write_adbc(df, con, "df_catalog", catalog_name = 
"main", mode = "replace"),
+    df
+  )
+
+  stream <- adbcdrivermanager::read_adbc(con, "SELECT * from main.df_catalog")
+  expect_identical(as.data.frame(stream), df)
+})
+
 test_that("write_adbc() with temporary = TRUE works with sqlite databases", {
   skip_if_not(packageVersion("adbcdrivermanager") >= "0.6.0.9000")
 

Reply via email to