This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new cdbb9f8d feat(r/sedonadb): Port Python configuration options changes
to R (#658)
cdbb9f8d is described below
commit cdbb9f8df35c00443cf82114c9f67584bf528a4e
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Feb 24 15:11:50 2026 -0600
feat(r/sedonadb): Port Python configuration options changes to R (#658)
Co-authored-by: Copilot <[email protected]>
---
r/sedonadb/DESCRIPTION | 1 +
r/sedonadb/NAMESPACE | 6 +
r/sedonadb/R/000-wrappers.R | 20 ++--
r/sedonadb/R/context.R | 164 ++++++++++++++++++++++++--
r/sedonadb/R/dataframe.R | 109 +++++++++++++----
r/sedonadb/R/expression.R | 1 +
r/sedonadb/man/as_sedonadb_dataframe.Rd | 5 +-
r/sedonadb/man/sd_connect.Rd | 49 ++++++++
r/sedonadb/man/sd_drop_view.Rd | 8 ++
r/sedonadb/man/sd_expr_column.Rd | 2 +
r/sedonadb/man/sd_read_parquet.Rd | 5 +
r/sedonadb/man/sd_register_udf.Rd | 5 +
r/sedonadb/man/sd_sql.Rd | 5 +
r/sedonadb/man/sd_to_view.Rd | 4 +-
r/sedonadb/man/sd_write_parquet.Rd | 19 ++-
r/sedonadb/src/init.c | 19 +--
r/sedonadb/src/rust/api.h | 9 +-
r/sedonadb/src/rust/src/context.rs | 24 +++-
r/sedonadb/src/rust/src/dataframe.rs | 38 +++---
r/sedonadb/tests/testthat/_snaps/context.md | 8 ++
r/sedonadb/tests/testthat/_snaps/dataframe.md | 12 ++
r/sedonadb/tests/testthat/test-context.R | 47 ++++++++
r/sedonadb/tests/testthat/test-dataframe.R | 69 ++++++++++-
23 files changed, 555 insertions(+), 74 deletions(-)
diff --git a/r/sedonadb/DESCRIPTION b/r/sedonadb/DESCRIPTION
index 90f82b0b..ba3e94c9 100644
--- a/r/sedonadb/DESCRIPTION
+++ b/r/sedonadb/DESCRIPTION
@@ -14,6 +14,7 @@ Depends: R (>= 4.1.0)
Suggests:
adbcdrivermanager,
dplyr,
+ lintr,
rlang,
tibble,
sf,
diff --git a/r/sedonadb/NAMESPACE b/r/sedonadb/NAMESPACE
index c5c3be0f..35553123 100644
--- a/r/sedonadb/NAMESPACE
+++ b/r/sedonadb/NAMESPACE
@@ -38,7 +38,13 @@ export(sd_arrange)
export(sd_collect)
export(sd_compute)
export(sd_configure_proj)
+export(sd_connect)
export(sd_count)
+export(sd_ctx_drop_view)
+export(sd_ctx_read_parquet)
+export(sd_ctx_register_udf)
+export(sd_ctx_sql)
+export(sd_ctx_view)
export(sd_drop_view)
export(sd_expr_aggregate_function)
export(sd_expr_alias)
diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R
index dbe4a405..f8884991 100644
--- a/r/sedonadb/R/000-wrappers.R
+++ b/r/sedonadb/R/000-wrappers.R
@@ -179,8 +179,12 @@ NULL
### associated functions for InternalContext
-`InternalContext`$`new` <- function() {
- .savvy_wrap_InternalContext(.Call(savvy_InternalContext_new__impl))
+`InternalContext`$`new` <- function(`option_keys`, `option_values`) {
+ .savvy_wrap_InternalContext(.Call(
+ savvy_InternalContext_new__impl,
+ `option_keys`,
+ `option_values`
+ ))
}
@@ -319,11 +323,11 @@ class(`InternalContext`) <- c(
function(
`ctx`,
`path`,
+ `option_keys`,
+ `option_values`,
`partition_by`,
`sort_by`,
- `single_file_output`,
- `overwrite_bbox_columns`,
- `geoparquet_version` = NULL
+ `single_file_output`
) {
`ctx` <- .savvy_extract_ptr(`ctx`, "sedonadb::InternalContext")
invisible(.Call(
@@ -331,11 +335,11 @@ class(`InternalContext`) <- c(
`self`,
`ctx`,
`path`,
+ `option_keys`,
+ `option_values`,
`partition_by`,
`sort_by`,
- `single_file_output`,
- `overwrite_bbox_columns`,
- `geoparquet_version`
+ `single_file_output`
))
}
}
diff --git a/r/sedonadb/R/context.R b/r/sedonadb/R/context.R
index 0eac883e..2384f5bb 100644
--- a/r/sedonadb/R/context.R
+++ b/r/sedonadb/R/context.R
@@ -15,10 +15,110 @@
# specific language governing permissions and limitations
# under the License.
+#' Create a SedonaDB context
+#'
+#' Runtime options configure the execution environment. Use
+#' `global = TRUE` to configure the global context or use the
+#' returned object as a scoped context. A scoped context is
+#' recommended for programmatic usage as it prevents named
+#' views from interfering with each other.
+#'
+#' @param global Use TRUE to set options on the global context.
+#' @param memory_limit Maximum memory for query execution, as a
+#' human-readable string (e.g., `"4gb"`, `"512m"`) or `NULL` for
+#' unbounded (the default).
+#' @param temp_dir Directory for temporary/spill files, or `NULL` to
+#' use the DataFusion default.
+#' @param memory_pool_type Memory pool type: `"greedy"` (default) or
+#' `"fair"`. Only takes effect when `memory_limit` is set.
+#' @param unspillable_reserve_ratio Fraction of memory (0--1) reserved for
+#' unspillable consumers. Only applies when `memory_pool_type` is
+#' `"fair"`. Defaults to 0.2 when not explicitly set.
+#' @param ... Reserved for future options
+#'
+#' @returns The constructed context, invisibly.
+#' @export
+#'
+#' @examples
+#' sd_connect(memory_limit = "100mb", memory_pool_type = "fair")
+#'
+#'
+sd_connect <- function(
+ ...,
+ global = FALSE,
+ memory_limit = NULL,
+ temp_dir = NULL,
+ memory_pool_type = NULL,
+ unspillable_reserve_ratio = NULL
+) {
+ unsupported_options <- list(...)
+ if (length(unsupported_options) != 0) {
+ warning(
+ sprintf(
+ "Unrecognized options for sd_connect(): %s",
+ paste(names(unsupported_options), collapse = ", ")
+ )
+ )
+ }
+
+ options <- list()
+
+ if (!is.null(memory_limit)) {
+ stopifnot(is.character(memory_limit), length(memory_limit) == 1L)
+ options[["memory_limit"]] <- memory_limit
+ }
+
+ if (!is.null(temp_dir)) {
+ stopifnot(is.character(temp_dir), length(temp_dir) == 1L)
+ options[["temp_dir"]] <- temp_dir
+ }
+
+ if (!is.null(memory_pool_type)) {
+ memory_pool_type <- match.arg(memory_pool_type, c("greedy", "fair"))
+ options[["memory_pool_type"]] <- memory_pool_type
+ }
+
+ if (!is.null(unspillable_reserve_ratio)) {
+ stopifnot(
+ is.numeric(unspillable_reserve_ratio),
+ length(unspillable_reserve_ratio) == 1L,
+ unspillable_reserve_ratio >= 0,
+ unspillable_reserve_ratio <= 1
+ )
+ options[["unspillable_reserve_ratio"]] <- as.character(
+ unspillable_reserve_ratio
+ )
+ }
+
+ # Don't replace the global context
+ if (global && length(options) > 0 && !is.null(global_ctx$ctx)) {
+ warning(
+ "Cannot change runtime options after the context has been initialized. ",
+ "Set global options with sd_connect() before executing your first query
",
+ "or use a scoped context by passing global = FALSE"
+ )
+
+ return(invisible(global_ctx$ctx))
+ } else if (global && !is.null(global_ctx$ctx)) {
+ return(invisible(global_ctx$ctx))
+ }
+
+ keys <- as.character(names(options))
+ values <- as.character(options)
+ ctx <- InternalContext$new(keys, values)
+
+ if (global) {
+ global_ctx$ctx <- ctx
+ }
+
+ invisible(ctx)
+}
+
#' Create a DataFrame from one or more Parquet files
#'
#' The query will only be executed when requested.
#'
+#' @param ctx A SedonaDB context.
#' @param path One or more paths or URIs to Parquet files
#'
#' @returns A sedonadb_dataframe
@@ -29,7 +129,13 @@
#' sd_read_parquet(path) |> head(5) |> sd_preview()
#'
sd_read_parquet <- function(path) {
- ctx <- ctx()
+ sd_ctx_read_parquet(ctx(), path)
+}
+
+#' @rdname sd_read_parquet
+#' @export
+sd_ctx_read_parquet <- function(ctx, path) {
+ check_ctx(ctx)
df <- ctx$read_parquet(path)
new_sedonadb_dataframe(ctx, df)
}
@@ -38,6 +144,7 @@ sd_read_parquet <- function(path) {
#'
#' The query will only be executed when requested.
#'
+#' @param ctx A SedonaDB context.
#' @param sql A SQL string to execute
#'
#' @returns A sedonadb_dataframe
@@ -47,7 +154,13 @@ sd_read_parquet <- function(path) {
#' sd_sql("SELECT ST_Point(0, 1) as geom") |> sd_preview()
#'
sd_sql <- function(sql) {
- ctx <- ctx()
+ sd_ctx_sql(ctx(), sql)
+}
+
+#' @rdname sd_sql
+#' @export
+sd_ctx_sql <- function(ctx, sql) {
+ check_ctx(ctx)
df <- ctx$sql(sql)
new_sedonadb_dataframe(ctx, df)
}
@@ -56,6 +169,7 @@ sd_sql <- function(sql) {
#'
#' Remove a view created with [sd_to_view()] from the context.
#'
+#' @param ctx A SedonaDB context.
#' @param table_ref The name of the view reference
#' @returns The context, invisibly
#' @export
@@ -67,7 +181,13 @@ sd_sql <- function(sql) {
#' try(sd_view("foofy"))
#'
sd_drop_view <- function(table_ref) {
- ctx <- ctx()
+ sd_ctx_drop_view(ctx(), table_ref)
+}
+
+#' @rdname sd_drop_view
+#' @export
+sd_ctx_drop_view <- function(ctx, table_ref) {
+ check_ctx(ctx)
ctx$deregister_table(table_ref)
invisible(ctx)
}
@@ -75,7 +195,13 @@ sd_drop_view <- function(table_ref) {
#' @rdname sd_drop_view
#' @export
sd_view <- function(table_ref) {
- ctx <- ctx()
+ sd_ctx_view(ctx(), table_ref)
+}
+
+#' @rdname sd_drop_view
+#' @export
+sd_ctx_view <- function(ctx, table_ref) {
+ check_ctx(ctx)
df <- ctx$view(table_ref)
new_sedonadb_dataframe(ctx, df)
}
@@ -88,28 +214,48 @@ sd_view <- function(table_ref) {
#' to a Rust `FFI_ScalarUDF`, an example of which is available from the
#' [DataFusion Python
documentation](https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs).
#'
+#' @param ctx A SedonaDB context.
#' @param udf An object of class 'datafusion_scalar_udf'
#'
#' @returns NULL, invisibly
#' @export
#'
sd_register_udf <- function(udf) {
- ctx <- ctx()
+ sd_ctx_register_udf(ctx(), udf)
+}
+
+#' @rdname sd_register_udf
+#' @export
+sd_ctx_register_udf <- function(ctx, udf) {
+ check_ctx(ctx)
ctx$register_scalar_udf(udf)
}
# nolint end
-# We use just one context for now. In theory we could support multiple
-# contexts with a shared runtime, which would scope the registration
-# of various components more cleanly from the runtime.
+# We mostly use one context but support isolated contexts via sd_ctx_*
+# functions, which more cleanly scope the registration of views and
+# UDFs.
ctx <- function() {
if (is.null(global_ctx$ctx)) {
- global_ctx$ctx <- InternalContext$new()
+ global_ctx$ctx <- sd_connect()
}
global_ctx$ctx
}
+check_ctx <- function(ctx) {
+ if (!inherits(ctx, "sedonadb::InternalContext")) {
+ stop(
+ sprintf(
+ "Expected ctx to be a SedonaDB context but got object of class '%s'",
+ class(ctx)[1]
+ )
+ )
+ }
+
+ invisible(ctx)
+}
+
global_ctx <- new.env(parent = emptyenv())
global_ctx$ctx <- NULL
diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R
index 4bb4456d..6eb3f56a 100644
--- a/r/sedonadb/R/dataframe.R
+++ b/r/sedonadb/R/dataframe.R
@@ -18,6 +18,8 @@
#' Convert an object to a DataFrame
#'
#' @param x An object to convert
+#' @param ctx A SedonaDB context. This should always be passed to inner calls
+#' to SedonaDB functions; NULL implies the global context.
#' @param ... Extra arguments passed to/from methods
#' @param schema The requested schema
#'
@@ -27,29 +29,37 @@
#' @examples
#' as_sedonadb_dataframe(data.frame(x = 1:3))
#'
-as_sedonadb_dataframe <- function(x, ..., schema = NULL) {
+as_sedonadb_dataframe <- function(x, ..., schema = NULL, ctx = NULL) {
UseMethod("as_sedonadb_dataframe")
}
#' @export
-as_sedonadb_dataframe.sedonadb_dataframe <- function(x, ..., schema = NULL) {
+as_sedonadb_dataframe.sedonadb_dataframe <- function(x, ..., schema = NULL,
ctx = NULL) {
# In the future, schema can be handled with a cast
x
}
#' @export
-as_sedonadb_dataframe.data.frame <- function(x, ..., schema = NULL) {
+as_sedonadb_dataframe.data.frame <- function(x, ..., schema = NULL, ctx =
NULL) {
array <- nanoarrow::as_nanoarrow_array(x, schema = schema)
stream <- nanoarrow::basic_array_stream(list(array))
- ctx <- ctx()
+
+ if (is.null(ctx)) {
+ ctx <- ctx()
+ }
+
df <- ctx$data_frame_from_array_stream(stream, collect_now = TRUE)
new_sedonadb_dataframe(ctx, df)
}
#' @export
-as_sedonadb_dataframe.nanoarrow_array <- function(x, ..., schema = NULL) {
+as_sedonadb_dataframe.nanoarrow_array <- function(x, ..., schema = NULL, ctx =
NULL) {
stream <- nanoarrow::as_nanoarrow_array_stream(x, schema = schema)
- ctx <- ctx()
+
+ if (is.null(ctx)) {
+ ctx <- ctx()
+ }
+
df <- ctx$data_frame_from_array_stream(stream, collect_now = TRUE)
# Verify schema is handled
@@ -61,10 +71,15 @@ as_sedonadb_dataframe.nanoarrow_array_stream <- function(
x,
...,
schema = NULL,
- lazy = TRUE
+ lazy = TRUE,
+ ctx = NULL
) {
stream <- nanoarrow::as_nanoarrow_array_stream(x, schema = schema)
- ctx <- ctx()
+
+ if (is.null(ctx)) {
+ ctx <- ctx()
+ }
+
df <- ctx$data_frame_from_array_stream(stream, collect_now = !lazy)
# Verify schema is handled
@@ -72,8 +87,16 @@ as_sedonadb_dataframe.nanoarrow_array_stream <- function(
}
#' @export
-as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema =
NULL) {
- ctx <- ctx()
+as_sedonadb_dataframe.datafusion_table_provider <- function(
+ x,
+ ...,
+ schema = NULL,
+ ctx = NULL
+) {
+ if (is.null(ctx)) {
+ ctx <- ctx()
+ }
+
df <- ctx$data_frame_from_table_provider(x)
new_sedonadb_dataframe(ctx, df)
}
@@ -109,8 +132,12 @@ sd_count <- function(.data) {
#' sd_sql("SELECT 1 as one") |> sd_to_view("foofy")
#' sd_sql("SELECT * FROM foofy")
#'
-sd_to_view <- function(.data, table_ref, overwrite = FALSE) {
- .data <- as_sedonadb_dataframe(.data)
+sd_to_view <- function(.data, table_ref, overwrite = FALSE, ctx = NULL) {
+ if (is.null(ctx)) {
+ ctx <- ctx()
+ }
+
+ .data <- as_sedonadb_dataframe(.data, ctx = ctx)
.data$df$to_view(.data$ctx, table_ref, overwrite)
invisible(.data)
}
@@ -453,6 +480,12 @@ sd_summarize <- function(.data, ...) {
#'
#' @inheritParams sd_count
#' @param path A filename or directory to which parquet file(s) should be
written
+#' @param options A named list of key/value options to be used when
constructing
+#' a parquet writer. Common options are exposed as other arguments to
+#' `sd_write_parquet()`; however, this argument allows setting any DataFusion
+#' Parquet writer option. If an option is specified here and by another
+#' argument to this function, the value specified as an explicit argument
+#' takes precedence.
#' @param partition_by A character vector of column names to partition by. If
non-empty,
#' applies hive-style partitioning to the output
#' @param sort_by A character vector of column names to sort by. Currently only
@@ -475,6 +508,11 @@ sd_summarize <- function(.data, ...) {
#' that already exist in the input. This is useful in a read -> modify
#' -> write scenario to ensure these columns are up-to-date. If FALSE
#' (the default), an error will be raised if a bbox column already exists
+#' @param max_row_group_size Target maximum number of rows in each row group.
+#' Defaults to the global configuration value (1M rows).
+#' @param compression Sets the Parquet compression codec. Valid values are:
+#' uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and
+#' lz4_raw. Defaults to the global configuration value (zstd(3)).
#'
#' @returns The input, invisibly
#' @export
@@ -491,11 +529,14 @@ sd_summarize <- function(.data, ...) {
sd_write_parquet <- function(
.data,
path,
+ options = NULL,
partition_by = character(0),
sort_by = character(0),
single_file_output = NULL,
geoparquet_version = "1.0",
- overwrite_bbox_columns = FALSE
+ overwrite_bbox_columns = FALSE,
+ max_row_group_size = NULL,
+ compression = NULL
) {
.data <- as_sedonadb_dataframe(.data)
if (!is.null(.data$group_by)) {
@@ -507,22 +548,50 @@ sd_write_parquet <- function(
single_file_output <- length(partition_by) == 0 && grepl("\\.parquet$",
path)
}
- # Validate geoparquet_version
+ # Build the options list: start with user-provided options, then override
+ # with explicitly-specified arguments
+ if (is.null(options)) {
+ options <- list()
+ } else {
+ options <- as.list(options)
+ }
+
+ if (!is.null(max_row_group_size)) {
+ options[["max_row_group_size"]] <-
as.character(as.integer(max_row_group_size))
+ }
+
+ if (!is.null(compression)) {
+ options[["compression"]] <- as.character(compression)
+ }
+
+ # Validate and apply geoparquet_version
if (!is.null(geoparquet_version)) {
- if (!geoparquet_version %in% c("1.0", "1.1")) {
- stop("geoparquet_version must be '1.0' or '1.1'")
- }
+ options[["geoparquet_version"]] <- as.character(geoparquet_version)
+ }
+
+ options[["overwrite_bbox_columns"]] <-
tolower(as.character(overwrite_bbox_columns))
+
+ # Convert options to parallel character vectors for Rust
+ option_keys <- names(options)
+ option_values <- as.character(unlist(options, use.names = FALSE))
+
+ if (is.null(option_keys) || any(is.na(option_keys)) || any(option_keys ==
"")) {
+ stop("All option values must be named")
+ }
+
+ if (length(option_keys) != length(option_values)) {
+ stop("All option values must be length 1")
}
# Call the underlying Rust method
.data$df$to_parquet(
ctx = .data$ctx,
path = path,
+ option_keys = option_keys,
+ option_values = option_values,
partition_by = partition_by,
sort_by = sort_by,
- single_file_output = single_file_output,
- overwrite_bbox_columns = overwrite_bbox_columns,
- geoparquet_version = geoparquet_version
+ single_file_output = single_file_output
)
invisible(.data)
diff --git a/r/sedonadb/R/expression.R b/r/sedonadb/R/expression.R
index 223106c0..9866e23b 100644
--- a/r/sedonadb/R/expression.R
+++ b/r/sedonadb/R/expression.R
@@ -18,6 +18,7 @@
#' Create SedonaDB logical expressions
#'
#' @param column_name A column name
+#' @param x An object to convert to a SedonaDB literal (constant).
#' @param qualifier An optional qualifier (e.g., table reference) that may be
#' used to disambiguate a specific reference
#' @param function_name The name of the function to call. This name is resolved
diff --git a/r/sedonadb/man/as_sedonadb_dataframe.Rd
b/r/sedonadb/man/as_sedonadb_dataframe.Rd
index b5c30606..a782ddd2 100644
--- a/r/sedonadb/man/as_sedonadb_dataframe.Rd
+++ b/r/sedonadb/man/as_sedonadb_dataframe.Rd
@@ -4,7 +4,7 @@
\alias{as_sedonadb_dataframe}
\title{Convert an object to a DataFrame}
\usage{
-as_sedonadb_dataframe(x, ..., schema = NULL)
+as_sedonadb_dataframe(x, ..., schema = NULL, ctx = NULL)
}
\arguments{
\item{x}{An object to convert}
@@ -12,6 +12,9 @@ as_sedonadb_dataframe(x, ..., schema = NULL)
\item{...}{Extra arguments passed to/from methods}
\item{schema}{The requested schema}
+
+\item{ctx}{A SedonaDB context. This should always be passed to inner calls
+to SedonaDB functions; NULL implies the global context.}
}
\value{
A sedonadb_dataframe
diff --git a/r/sedonadb/man/sd_connect.Rd b/r/sedonadb/man/sd_connect.Rd
new file mode 100644
index 00000000..72afda94
--- /dev/null
+++ b/r/sedonadb/man/sd_connect.Rd
@@ -0,0 +1,49 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/context.R
+\name{sd_connect}
+\alias{sd_connect}
+\title{Create a SedonaDB context}
+\usage{
+sd_connect(
+ ...,
+ global = FALSE,
+ memory_limit = NULL,
+ temp_dir = NULL,
+ memory_pool_type = NULL,
+ unspillable_reserve_ratio = NULL
+)
+}
+\arguments{
+\item{...}{Reserved for future options}
+
+\item{global}{Use TRUE to set options on the global context.}
+
+\item{memory_limit}{Maximum memory for query execution, as a
+human-readable string (e.g., \code{"4gb"}, \code{"512m"}) or \code{NULL} for
+unbounded (the default).}
+
+\item{temp_dir}{Directory for temporary/spill files, or \code{NULL} to
+use the DataFusion default.}
+
+\item{memory_pool_type}{Memory pool type: \code{"greedy"} (default) or
+\code{"fair"}. Only takes effect when \code{memory_limit} is set.}
+
+\item{unspillable_reserve_ratio}{Fraction of memory (0--1) reserved for
+unspillable consumers. Only applies when \code{memory_pool_type} is
+\code{"fair"}. Defaults to 0.2 when not explicitly set.}
+}
+\value{
+The constructed context, invisibly.
+}
+\description{
+Runtime options configure the execution environment. Use
+\code{global = TRUE} to configure the global context or use the
+returned object as a scoped context. A scoped context is
+recommended for programmatic usage as it prevents named
+views from interfering with each other.
+}
+\examples{
+sd_connect(memory_limit = "100mb", memory_pool_type = "fair")
+
+
+}
diff --git a/r/sedonadb/man/sd_drop_view.Rd b/r/sedonadb/man/sd_drop_view.Rd
index 046b2607..a66e89c3 100644
--- a/r/sedonadb/man/sd_drop_view.Rd
+++ b/r/sedonadb/man/sd_drop_view.Rd
@@ -2,15 +2,23 @@
% Please edit documentation in R/context.R
\name{sd_drop_view}
\alias{sd_drop_view}
+\alias{sd_ctx_drop_view}
\alias{sd_view}
+\alias{sd_ctx_view}
\title{Create or Drop a named view}
\usage{
sd_drop_view(table_ref)
+sd_ctx_drop_view(ctx, table_ref)
+
sd_view(table_ref)
+
+sd_ctx_view(ctx, table_ref)
}
\arguments{
\item{table_ref}{The name of the view reference}
+
+\item{ctx}{A SedonaDB context.}
}
\value{
The context, invisibly
diff --git a/r/sedonadb/man/sd_expr_column.Rd b/r/sedonadb/man/sd_expr_column.Rd
index f3a6a521..76a12c14 100644
--- a/r/sedonadb/man/sd_expr_column.Rd
+++ b/r/sedonadb/man/sd_expr_column.Rd
@@ -61,6 +61,8 @@ used to disambiguate a specific reference}
\item{factory}{A \code{\link[=sd_expr_factory]{sd_expr_factory()}}. This
factory wraps a SedonaDB context
and is used to resolve scalar functions and/or retrieve options.}
+\item{x}{An object to convert to a SedonaDB literal (constant).}
+
\item{type}{A destination type into which \code{expr} should be cast.}
\item{op}{Operator name for a binary expression. In general these follow
diff --git a/r/sedonadb/man/sd_read_parquet.Rd
b/r/sedonadb/man/sd_read_parquet.Rd
index c370c777..e6a9ae42 100644
--- a/r/sedonadb/man/sd_read_parquet.Rd
+++ b/r/sedonadb/man/sd_read_parquet.Rd
@@ -2,12 +2,17 @@
% Please edit documentation in R/context.R
\name{sd_read_parquet}
\alias{sd_read_parquet}
+\alias{sd_ctx_read_parquet}
\title{Create a DataFrame from one or more Parquet files}
\usage{
sd_read_parquet(path)
+
+sd_ctx_read_parquet(ctx, path)
}
\arguments{
\item{path}{One or more paths or URIs to Parquet files}
+
+\item{ctx}{A SedonaDB context.}
}
\value{
A sedonadb_dataframe
diff --git a/r/sedonadb/man/sd_register_udf.Rd
b/r/sedonadb/man/sd_register_udf.Rd
index 345a69ab..c9066936 100644
--- a/r/sedonadb/man/sd_register_udf.Rd
+++ b/r/sedonadb/man/sd_register_udf.Rd
@@ -2,12 +2,17 @@
% Please edit documentation in R/context.R
\name{sd_register_udf}
\alias{sd_register_udf}
+\alias{sd_ctx_register_udf}
\title{Register a user-defined function}
\usage{
sd_register_udf(udf)
+
+sd_ctx_register_udf(ctx, udf)
}
\arguments{
\item{udf}{An object of class 'datafusion_scalar_udf'}
+
+\item{ctx}{A SedonaDB context.}
}
\value{
NULL, invisibly
diff --git a/r/sedonadb/man/sd_sql.Rd b/r/sedonadb/man/sd_sql.Rd
index 34807207..dc670062 100644
--- a/r/sedonadb/man/sd_sql.Rd
+++ b/r/sedonadb/man/sd_sql.Rd
@@ -2,12 +2,17 @@
% Please edit documentation in R/context.R
\name{sd_sql}
\alias{sd_sql}
+\alias{sd_ctx_sql}
\title{Create a DataFrame from SQL}
\usage{
sd_sql(sql)
+
+sd_ctx_sql(ctx, sql)
}
\arguments{
\item{sql}{A SQL string to execute}
+
+\item{ctx}{A SedonaDB context.}
}
\value{
A sedonadb_dataframe
diff --git a/r/sedonadb/man/sd_to_view.Rd b/r/sedonadb/man/sd_to_view.Rd
index dce28849..3acb58bf 100644
--- a/r/sedonadb/man/sd_to_view.Rd
+++ b/r/sedonadb/man/sd_to_view.Rd
@@ -4,13 +4,15 @@
\alias{sd_to_view}
\title{Register a DataFrame as a named view}
\usage{
-sd_to_view(.data, table_ref, overwrite = FALSE)
+sd_to_view(.data, table_ref, ctx = NULL, overwrite = FALSE)
}
\arguments{
\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.}
\item{table_ref}{The name of the view reference}
+\item{ctx}{A SedonaDB context.}
+
\item{overwrite}{Use TRUE to overwrite a view with the same name (if it
exists)}
}
\value{
diff --git a/r/sedonadb/man/sd_write_parquet.Rd
b/r/sedonadb/man/sd_write_parquet.Rd
index b2d1d25b..e5b4dbf0 100644
--- a/r/sedonadb/man/sd_write_parquet.Rd
+++ b/r/sedonadb/man/sd_write_parquet.Rd
@@ -7,11 +7,14 @@
sd_write_parquet(
.data,
path,
+ options = NULL,
partition_by = character(0),
sort_by = character(0),
single_file_output = NULL,
geoparquet_version = "1.0",
- overwrite_bbox_columns = FALSE
+ overwrite_bbox_columns = FALSE,
+ max_row_group_size = NULL,
+ compression = NULL
)
}
\arguments{
@@ -19,6 +22,13 @@ sd_write_parquet(
\item{path}{A filename or directory to which parquet file(s) should be written}
+\item{options}{A named list of key/value options to be used when constructing
+a parquet writer. Common options are exposed as other arguments to
+\code{sd_write_parquet()}; however, this argument allows setting any DataFusion
+Parquet writer option. If an option is specified here and by another
+argument to this function, the value specified as an explicit argument
+takes precedence.}
+
\item{partition_by}{A character vector of column names to partition by. If
non-empty,
applies hive-style partitioning to the output}
@@ -45,6 +55,13 @@ will be named "[geom_col_name]_bbox" for all geometry
columns except
that already exist in the input. This is useful in a read -> modify
-> write scenario to ensure these columns are up-to-date. If FALSE
(the default), an error will be raised if a bbox column already exists}
+
+\item{max_row_group_size}{Target maximum number of rows in each row group.
+Defaults to the global configuration value (1M rows).}
+
+\item{compression}{Sets the Parquet compression codec. Valid values are:
+uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and
+lz4_raw. Defaults to the global configuration value (zstd(3)).}
}
\value{
The input, invisibly
diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c
index 95c44800..c2af14c0 100644
--- a/r/sedonadb/src/init.c
+++ b/r/sedonadb/src/init.c
@@ -106,8 +106,10 @@ SEXP savvy_InternalContext_list_functions__impl(SEXP
self__) {
return handle_result(res);
}
-SEXP savvy_InternalContext_new__impl(void) {
- SEXP res = savvy_InternalContext_new__ffi();
+SEXP savvy_InternalContext_new__impl(SEXP c_arg__option_keys,
+ SEXP c_arg__option_values) {
+ SEXP res =
+ savvy_InternalContext_new__ffi(c_arg__option_keys, c_arg__option_values);
return handle_result(res);
}
@@ -219,13 +221,12 @@ SEXP savvy_InternalDataFrame_to_arrow_stream__impl(
}
SEXP savvy_InternalDataFrame_to_parquet__impl(
- SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
- SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
- SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version) {
+ SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__option_keys,
+ SEXP c_arg__option_values, SEXP c_arg__partition_by, SEXP c_arg__sort_by,
+ SEXP c_arg__single_file_output) {
SEXP res = savvy_InternalDataFrame_to_parquet__ffi(
- self__, c_arg__ctx, c_arg__path, c_arg__partition_by, c_arg__sort_by,
- c_arg__single_file_output, c_arg__overwrite_bbox_columns,
- c_arg__geoparquet_version);
+ self__, c_arg__ctx, c_arg__path, c_arg__option_keys,
c_arg__option_values,
+ c_arg__partition_by, c_arg__sort_by, c_arg__single_file_output);
return handle_result(res);
}
@@ -335,7 +336,7 @@ static const R_CallMethodDef CallEntries[] = {
{"savvy_InternalContext_list_functions__impl",
(DL_FUNC)&savvy_InternalContext_list_functions__impl, 1},
{"savvy_InternalContext_new__impl",
- (DL_FUNC)&savvy_InternalContext_new__impl, 0},
+ (DL_FUNC)&savvy_InternalContext_new__impl, 2},
{"savvy_InternalContext_read_parquet__impl",
(DL_FUNC)&savvy_InternalContext_read_parquet__impl, 2},
{"savvy_InternalContext_register_scalar_udf__impl",
diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h
index f61a5df1..152f17bf 100644
--- a/r/sedonadb/src/rust/api.h
+++ b/r/sedonadb/src/rust/api.h
@@ -31,7 +31,8 @@ SEXP
savvy_InternalContext_data_frame_from_table_provider__ffi(
SEXP savvy_InternalContext_deregister_table__ffi(SEXP self__,
SEXP c_arg__table_ref);
SEXP savvy_InternalContext_list_functions__ffi(SEXP self__);
-SEXP savvy_InternalContext_new__ffi(void);
+SEXP savvy_InternalContext_new__ffi(SEXP c_arg__option_keys,
+ SEXP c_arg__option_values);
SEXP savvy_InternalContext_read_parquet__ffi(SEXP self__, SEXP c_arg__paths);
SEXP savvy_InternalContext_register_scalar_udf__ffi(
SEXP self__, SEXP c_arg__scalar_udf_xptr);
@@ -61,9 +62,9 @@ SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP
self__, SEXP c_arg__out);
SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(
SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr);
SEXP savvy_InternalDataFrame_to_parquet__ffi(
- SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
- SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
- SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
+ SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__option_keys,
+ SEXP c_arg__option_values, SEXP c_arg__partition_by, SEXP c_arg__sort_by,
+ SEXP c_arg__single_file_output);
SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__);
SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__table_ref,
diff --git a/r/sedonadb/src/rust/src/context.rs
b/r/sedonadb/src/rust/src/context.rs
index ca6ffc45..fa524389 100644
--- a/r/sedonadb/src/rust/src/context.rs
+++ b/r/sedonadb/src/rust/src/context.rs
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
@@ -22,7 +23,10 @@ use datafusion::catalog::{MemTable, TableProvider};
use datafusion_ffi::udf::FFI_ScalarUDF;
use savvy::{savvy, savvy_err, IntoExtPtrSexp, OwnedStringSexp, Result};
-use sedona::{context::SedonaContext,
record_batch_reader_provider::RecordBatchReaderProvider};
+use sedona::{
+ context::SedonaContext, context_builder::SedonaContextBuilder,
+ record_batch_reader_provider::RecordBatchReaderProvider,
+};
use sedona_geoparquet::provider::GeoParquetReadOptions;
use tokio::runtime::Runtime;
@@ -40,12 +44,26 @@ pub struct InternalContext {
#[savvy]
impl InternalContext {
- pub fn new() -> Result<Self> {
+ pub fn new(option_keys: savvy::Sexp, option_values: savvy::Sexp) ->
Result<Self> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
- let inner = wait_for_future_captured_r(&runtime,
SedonaContext::new_local_interactive())??;
+ let keys = savvy::StringSexp::try_from(option_keys)?;
+ let values = savvy::StringSexp::try_from(option_values)?;
+
+ let options: HashMap<String, String> = keys
+ .iter()
+ .zip(values.iter())
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect();
+
+ let inner = if options.is_empty() {
+ wait_for_future_captured_r(&runtime,
SedonaContext::new_local_interactive())??
+ } else {
+ let builder = SedonaContextBuilder::from_options(&options)?;
+ wait_for_future_captured_r(&runtime, builder.build())??
+ };
Ok(Self {
inner: Arc::new(inner),
diff --git a/r/sedonadb/src/rust/src/dataframe.rs
b/r/sedonadb/src/rust/src/dataframe.rs
index d5bdb460..371785fa 100644
--- a/r/sedonadb/src/rust/src/dataframe.rs
+++ b/r/sedonadb/src/rust/src/dataframe.rs
@@ -19,6 +19,7 @@ use arrow_array::ffi::FFI_ArrowSchema;
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use datafusion::catalog::MemTable;
+use datafusion::config::ConfigField;
use datafusion::prelude::DataFrame;
use datafusion_common::Column;
use datafusion_expr::utils::conjunction;
@@ -28,7 +29,7 @@ use savvy::{savvy, savvy_err, sexp, IntoExtPtrSexp, Result};
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::reader::SedonaStreamReader;
use sedona::show::{DisplayMode, DisplayTableOptions};
-use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
+use sedona_geoparquet::options::TableGeoParquetOptions;
use sedona_schema::schema::SedonaSchema;
use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc};
use tokio::runtime::Runtime;
@@ -231,12 +232,15 @@ impl InternalDataFrame {
&self,
ctx: &InternalContext,
path: &str,
+ option_keys: savvy::Sexp,
+ option_values: savvy::Sexp,
partition_by: savvy::Sexp,
sort_by: savvy::Sexp,
single_file_output: bool,
- overwrite_bbox_columns: bool,
- geoparquet_version: Option<&str>,
) -> savvy::Result<()> {
+ let option_keys_strsxp = savvy::StringSexp::try_from(option_keys)?;
+ let option_values_strsxp = savvy::StringSexp::try_from(option_values)?;
+
let partition_by_strsxp = savvy::StringSexp::try_from(partition_by)?;
let partition_by_vec = partition_by_strsxp
.iter()
@@ -257,22 +261,12 @@ impl InternalDataFrame {
})
.collect::<Vec<_>>();
- let options = SedonaWriteOptions::new()
+ let write_options = SedonaWriteOptions::new()
.with_partition_by(partition_by_vec)
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);
- let mut writer_options = TableGeoParquetOptions {
- overwrite_bbox_columns,
- ..Default::default()
- };
- if let Some(geoparquet_version) = geoparquet_version {
- writer_options.geoparquet_version = geoparquet_version
- .parse()
- .map_err(|e| savvy::Error::new(format!("Invalid
geoparquet_version: {e}")))?;
- } else {
- writer_options.geoparquet_version = GeoParquetVersion::Omitted;
- }
+ let mut writer_options = TableGeoParquetOptions::default();
// Resolve writer options from the context configuration
let global_parquet_options = ctx
@@ -286,13 +280,25 @@ impl InternalDataFrame {
.clone();
writer_options.inner.global = global_parquet_options;
+ // Apply user-specified options
+ for (k, v) in
option_keys_strsxp.iter().zip(option_values_strsxp.iter()) {
+ writer_options
+ .set(k, v)
+ .map_err(|e| savvy::Error::new(format!("{e}")))?;
+ }
+
let inner = self.inner.clone();
let inner_context = ctx.inner.clone();
let path_owned = path.to_string();
wait_for_future_captured_r(&self.runtime, async move {
inner
- .write_geoparquet(&inner_context, &path_owned, options,
Some(writer_options))
+ .write_geoparquet(
+ &inner_context,
+ &path_owned,
+ write_options,
+ Some(writer_options),
+ )
.await
})??;
diff --git a/r/sedonadb/tests/testthat/_snaps/context.md
b/r/sedonadb/tests/testthat/_snaps/context.md
new file mode 100644
index 00000000..76c5ef83
--- /dev/null
+++ b/r/sedonadb/tests/testthat/_snaps/context.md
@@ -0,0 +1,8 @@
+# the global context is never replaced
+
+ Cannot change runtime options after the context has been initialized. Set
global options with sd_connect() before executing your first query or use a
scoped context by passing global = FALSE
+
+# unrecognized options result in a warning
+
+ Unrecognized options for sd_connect(): not_an_option
+
diff --git a/r/sedonadb/tests/testthat/_snaps/dataframe.md
b/r/sedonadb/tests/testthat/_snaps/dataframe.md
index 2164d4c6..e9f66332 100644
--- a/r/sedonadb/tests/testthat/_snaps/dataframe.md
+++ b/r/sedonadb/tests/testthat/_snaps/dataframe.md
@@ -26,3 +26,15 @@
+------------+
Preview of up to 6 row(s)
+# sd_write_parquet validates geoparquet_version parameter
+
+ This feature is not implemented: GeoParquetVersion V2_0 is not yet
supported
+
+# sd_write_parquet() errors for inappropriately sized options
+
+ All option values must be length 1
+
+---
+
+ All option values must be named
+
diff --git a/r/sedonadb/tests/testthat/test-context.R
b/r/sedonadb/tests/testthat/test-context.R
index 0f2647dc..9ee4b672 100644
--- a/r/sedonadb/tests/testthat/test-context.R
+++ b/r/sedonadb/tests/testthat/test-context.R
@@ -15,6 +15,53 @@
# specific language governing permissions and limitations
# under the License.
+test_that("the global context is never replaced", {
+ # Check a few times to make sure this is true
+ ctx <- sd_connect(global = TRUE)
+ expect_true(rlang::is_reference(ctx, global_ctx$ctx))
+
+ ctx <- sd_connect(global = TRUE)
+ expect_true(rlang::is_reference(ctx, global_ctx$ctx))
+
+ expect_snapshot_warning(
+ expect_true(
+ rlang::is_reference(
+ sd_connect(global = TRUE, memory_limit = "5g"),
+ global_ctx$ctx
+ )
+ )
+ )
+})
+
+test_that("scoped connections can be created", {
+ ctx <- sd_connect(
+ memory_limit = "1g",
+ temp_dir = tempfile(),
+ memory_pool_type = "fair",
+ unspillable_reserve_ratio = 0.5
+ )
+
+ df <- data.frame(x = 1:10)
+ sd_to_view(df, "some_name", ctx = ctx, overwrite = TRUE)
+
+ df2 <- data.frame(y = 11:20)
+ sd_to_view(df2, "some_name", overwrite = TRUE)
+
+ expect_identical(
+ ctx |> sd_ctx_view("some_name") |> sd_collect(),
+ df
+ )
+
+ expect_identical(
+ sd_view("some_name") |> sd_collect(),
+ df2
+ )
+})
+
+test_that("unrecognized options result in a warning", {
+ expect_snapshot_warning(sd_connect(not_an_option = "foofy"))
+})
+
test_that("sd_read_parquet() works", {
path <- system.file("files/natural-earth_cities_geo.parquet", package =
"sedonadb")
expect_identical(sd_count(sd_read_parquet(path)), 243)
diff --git a/r/sedonadb/tests/testthat/test-dataframe.R
b/r/sedonadb/tests/testthat/test-dataframe.R
index 6697254c..7ef09e2d 100644
--- a/r/sedonadb/tests/testthat/test-dataframe.R
+++ b/r/sedonadb/tests/testthat/test-dataframe.R
@@ -284,9 +284,74 @@ test_that("sd_write_parquet validates geoparquet_version
parameter", {
)
# Invalid version should error
- expect_error(
+ expect_snapshot_error(
sd_write_parquet(df, tmp_parquet_file, geoparquet_version = "2.0"),
- "geoparquet_version must be"
+ )
+})
+
+test_that("sd_write_parquet accepts max_row_group_size parameter", {
+ skip_if_not_installed("arrow")
+
+ tmp_parquet_file <- tempfile(fileext = ".parquet")
+ tmp_parquet_file_tiny_groups <- tempfile(fileext = ".parquet")
+ on.exit(unlink(c(tmp_parquet_file, tmp_parquet_file_tiny_groups)))
+
+ df <- data.frame(x = 1:1e5)
+ sd_write_parquet(df, tmp_parquet_file)
+ sd_write_parquet(df, tmp_parquet_file_tiny_groups, max_row_group_size = 100)
+
+ # Check file size (tiny row groups have more metadata)
+ expect_gt(
+ file.size(tmp_parquet_file_tiny_groups),
+ file.size(tmp_parquet_file)
+ )
+})
+
+test_that("sd_write_parquet accepts compression parameter", {
+ tmp_parquet_file <- tempfile(fileext = ".parquet")
+ tmp_parquet_file_uncompressed <- tempfile(fileext = ".parquet")
+ on.exit(unlink(c(tmp_parquet_file, tmp_parquet_file_uncompressed)))
+
+ df <- data.frame(x = 1:1e5)
+ sd_write_parquet(df, tmp_parquet_file)
+ sd_write_parquet(df, tmp_parquet_file_uncompressed, compression =
"uncompressed")
+
+ expect_gt(
+ file.size(tmp_parquet_file_uncompressed),
+ file.size(tmp_parquet_file)
+ )
+})
+
+test_that("sd_write_parquet accepts options parameter", {
+ tmp_parquet_file <- tempfile(fileext = ".parquet")
+ tmp_parquet_file_uncompressed <- tempfile(fileext = ".parquet")
+ on.exit(unlink(c(tmp_parquet_file, tmp_parquet_file_uncompressed)))
+
+ df <- data.frame(x = 1:1e5)
+ sd_write_parquet(df, tmp_parquet_file)
+ sd_write_parquet(
+ df,
+ tmp_parquet_file_uncompressed,
+ options = list(compression = "uncompressed")
+ )
+
+ expect_gt(
+ file.size(tmp_parquet_file_uncompressed),
+ file.size(tmp_parquet_file)
+ )
+})
+
+test_that("sd_write_parquet() errors for inappropriately sized options", {
+ tmp_parquet_file <- tempfile(fileext = ".parquet")
+ on.exit(unlink(tmp_parquet_file))
+
+ df <- data.frame(x = 1:10)
+ expect_snapshot_error(
+ sd_write_parquet(df, tmp_parquet_file, options = list(foofy =
character(0)))
+ )
+
+ expect_snapshot_error(
+ sd_write_parquet(df, tmp_parquet_file, options = list("option without
name"))
)
})