This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new c876761  feat(r/sedonadb): Add FFI support for ScalarUDF and 
TableProvider (#214)
c876761 is described below

commit c8767616297263d4436160894aa9d9a352923073
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Oct 16 15:09:40 2025 -0500

    feat(r/sedonadb): Add FFI support for ScalarUDF and TableProvider (#214)
---
 Cargo.lock                                 |  1 +
 r/sedonadb/NAMESPACE                       |  2 +
 r/sedonadb/R/000-wrappers.R                | 32 +++++++++-
 r/sedonadb/R/context.R                     | 17 ++++++
 r/sedonadb/R/dataframe.R                   | 15 ++++-
 r/sedonadb/man/sd_register_udf.Rd          | 20 +++++++
 r/sedonadb/src/init.c                      | 45 +++++++++++++--
 r/sedonadb/src/rust/Cargo.toml             |  1 +
 r/sedonadb/src/rust/api.h                  |  9 ++-
 r/sedonadb/src/rust/src/context.rs         | 47 ++++++++++-----
 r/sedonadb/src/rust/src/dataframe.rs       | 31 +++++++++-
 r/sedonadb/src/rust/src/ffi.rs             | 93 ++++++++++++++++++++++++++++++
 r/sedonadb/src/rust/src/lib.rs             |  1 +
 r/sedonadb/tests/testthat/test-context.R   | 12 ++++
 r/sedonadb/tests/testthat/test-dataframe.R | 15 +++++
 15 files changed, 316 insertions(+), 25 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1eb1bc3..94550ec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5223,6 +5223,7 @@ dependencies = [
  "datafusion",
  "datafusion-common",
  "datafusion-expr",
+ "datafusion-ffi",
  "savvy",
  "savvy-ffi",
  "sedona",
diff --git a/r/sedonadb/NAMESPACE b/r/sedonadb/NAMESPACE
index 5e78149..73ebd6d 100644
--- a/r/sedonadb/NAMESPACE
+++ b/r/sedonadb/NAMESPACE
@@ -5,6 +5,7 @@ S3method("[[<-",savvy_sedonadb__sealed)
 S3method(as.data.frame,sedonadb_dataframe)
 S3method(as_nanoarrow_array_stream,sedonadb_dataframe)
 S3method(as_sedonadb_dataframe,data.frame)
+S3method(as_sedonadb_dataframe,datafusion_table_provider)
 S3method(as_sedonadb_dataframe,nanoarrow_array)
 S3method(as_sedonadb_dataframe,nanoarrow_array_stream)
 S3method(as_sedonadb_dataframe,sedonadb_dataframe)
@@ -24,6 +25,7 @@ export(sd_count)
 export(sd_drop_view)
 export(sd_preview)
 export(sd_read_parquet)
+export(sd_register_udf)
 export(sd_sql)
 export(sd_to_view)
 export(sd_view)
diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R
index 15ca659..c93fe0b 100644
--- a/r/sedonadb/R/000-wrappers.R
+++ b/r/sedonadb/R/000-wrappers.R
@@ -77,6 +77,12 @@ NULL
   }
 }
 
+`InternalContext_data_frame_from_table_provider` <- function(self) {
+  function(`provider_xptr`) {
+    
.savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_data_frame_from_table_provider__impl,
 `self`, `provider_xptr`))
+  }
+}
+
 `InternalContext_deregister_table` <- function(self) {
   function(`table_ref`) {
     invisible(.Call(savvy_InternalContext_deregister_table__impl, `self`, 
`table_ref`))
@@ -89,6 +95,18 @@ NULL
   }
 }
 
+`InternalContext_register_scalar_udf` <- function(self) {
+  function(`scalar_udf_xptr`) {
+    invisible(.Call(savvy_InternalContext_register_scalar_udf__impl, `self`, 
`scalar_udf_xptr`))
+  }
+}
+
+`InternalContext_scalar_udf_xptr` <- function(self) {
+  function(`name`) {
+    .Call(savvy_InternalContext_scalar_udf_xptr__impl, `self`, `name`)
+  }
+}
+
 `InternalContext_sql` <- function(self) {
   function(`query`) {
     .savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_sql__impl, 
`self`, `query`))
@@ -105,8 +123,11 @@ NULL
   e <- new.env(parent = emptyenv())
   e$.ptr <- ptr
   e$`data_frame_from_array_stream` <- 
`InternalContext_data_frame_from_array_stream`(ptr)
+  e$`data_frame_from_table_provider` <- 
`InternalContext_data_frame_from_table_provider`(ptr)
   e$`deregister_table` <- `InternalContext_deregister_table`(ptr)
   e$`read_parquet` <- `InternalContext_read_parquet`(ptr)
+  e$`register_scalar_udf` <- `InternalContext_register_scalar_udf`(ptr)
+  e$`scalar_udf_xptr` <- `InternalContext_scalar_udf_xptr`(ptr)
   e$`sql` <- `InternalContext_sql`(ptr)
   e$`view` <- `InternalContext_view`(ptr)
 
@@ -179,8 +200,8 @@ class(`InternalContext`) <- c("InternalContext__bundle", 
"savvy_sedonadb__sealed
 }
 
 `InternalDataFrame_to_arrow_stream` <- function(self) {
-  function(`out`) {
-    invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`, 
`out`))
+  function(`out`, `requested_schema_xptr`) {
+    invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`, 
`out`, `requested_schema_xptr`))
   }
 }
 
@@ -191,6 +212,12 @@ class(`InternalContext`) <- c("InternalContext__bundle", 
"savvy_sedonadb__sealed
   }
 }
 
+`InternalDataFrame_to_provider` <- function(self) {
+  function() {
+    .Call(savvy_InternalDataFrame_to_provider__impl, `self`)
+  }
+}
+
 `InternalDataFrame_to_view` <- function(self) {
   function(`ctx`, `table_ref`, `overwrite`) {
     `ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
@@ -210,6 +237,7 @@ class(`InternalContext`) <- c("InternalContext__bundle", 
"savvy_sedonadb__sealed
   e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr)
   e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr)
   e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr)
+  e$`to_provider` <- `InternalDataFrame_to_provider`(ptr)
   e$`to_view` <- `InternalDataFrame_to_view`(ptr)
 
   class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed")
diff --git a/r/sedonadb/R/context.R b/r/sedonadb/R/context.R
index 86a55f8..2c921b9 100644
--- a/r/sedonadb/R/context.R
+++ b/r/sedonadb/R/context.R
@@ -80,6 +80,23 @@ sd_view <- function(table_ref) {
   new_sedonadb_dataframe(ctx, df)
 }
 
+#' Register a user-defined function
+#'
+#' Several types of user-defined functions can be registered into a session
+#' context. Currently, the only implemented variety is an external pointer
+#' to a Rust `FFI_ScalarUDF`, an example of which is available from the
+#' [DataFusion Python 
documentation](https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs).
+#'
+#' @param udf An object of class 'datafusion_scalar_udf'
+#'
+#' @returns NULL, invisibly
+#' @export
+#'
+sd_register_udf <- function(udf) {
+  ctx <- ctx()
+  ctx$register_scalar_udf(udf)
+}
+
 # We use just one context for now. In theory we could support multiple
 # contexts with a shared runtime, which would scope the registration
 # of various components more cleanly from the runtime.
diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R
index 0e5b511..4dabad5 100644
--- a/r/sedonadb/R/dataframe.R
+++ b/r/sedonadb/R/dataframe.R
@@ -67,6 +67,13 @@ as_sedonadb_dataframe.nanoarrow_array_stream <- function(x, 
..., schema = NULL,
   as_sedonadb_dataframe(new_sedonadb_dataframe(ctx, df), schema = schema)
 }
 
+#' @export
+as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema = 
NULL) {
+  ctx <- ctx()
+  df <- ctx$data_frame_from_table_provider(x)
+  new_sedonadb_dataframe(ctx, df)
+}
+
 #' Count rows in a DataFrame
 #'
 #' @param .data A sedonadb_dataframe
@@ -297,9 +304,13 @@ infer_nanoarrow_schema.sedonadb_dataframe <- function(x, 
...) {
 
 #' @importFrom nanoarrow as_nanoarrow_array_stream
 #' @export
-as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ...) {
+as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ..., schema = 
NULL) {
+  if (!is.null(schema)) {
+    schema <- nanoarrow::as_nanoarrow_schema(schema)
+  }
+
   stream <- nanoarrow::nanoarrow_allocate_array_stream()
-  x$df$to_arrow_stream(stream)
+  x$df$to_arrow_stream(stream, schema)
   stream
 }
 
diff --git a/r/sedonadb/man/sd_register_udf.Rd 
b/r/sedonadb/man/sd_register_udf.Rd
new file mode 100644
index 0000000..345a69a
--- /dev/null
+++ b/r/sedonadb/man/sd_register_udf.Rd
@@ -0,0 +1,20 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/context.R
+\name{sd_register_udf}
+\alias{sd_register_udf}
+\title{Register a user-defined function}
+\usage{
+sd_register_udf(udf)
+}
+\arguments{
+\item{udf}{An object of class 'datafusion_scalar_udf'}
+}
+\value{
+NULL, invisibly
+}
+\description{
+Several types of user-defined functions can be registered into a session
+context. Currently, the only implemented variety is an external pointer
+to a Rust \code{FFI_ScalarUDF}, an example of which is available from the
+\href{https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs}{DataFusion
 Python documentation}.
+}
diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c
index 6988665..f72a3ca 100644
--- a/r/sedonadb/src/init.c
+++ b/r/sedonadb/src/init.c
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <Rinternals.h>
 #include <stdint.h>
 
+#include <Rinternals.h>
+
 #include <R_ext/Parse.h>
 
 #include "rust/api.h"
@@ -83,6 +84,13 @@ SEXP 
savvy_InternalContext_data_frame_from_array_stream__impl(
   return handle_result(res);
 }
 
+SEXP savvy_InternalContext_data_frame_from_table_provider__impl(
+    SEXP self__, SEXP c_arg__provider_xptr) {
+  SEXP res = savvy_InternalContext_data_frame_from_table_provider__ffi(
+      self__, c_arg__provider_xptr);
+  return handle_result(res);
+}
+
 SEXP savvy_InternalContext_deregister_table__impl(SEXP self__,
                                                   SEXP c_arg__table_ref) {
   SEXP res =
@@ -100,6 +108,19 @@ SEXP savvy_InternalContext_read_parquet__impl(SEXP self__, 
SEXP c_arg__paths) {
   return handle_result(res);
 }
 
+SEXP savvy_InternalContext_register_scalar_udf__impl(
+    SEXP self__, SEXP c_arg__scalar_udf_xptr) {
+  SEXP res = savvy_InternalContext_register_scalar_udf__ffi(
+      self__, c_arg__scalar_udf_xptr);
+  return handle_result(res);
+}
+
+SEXP savvy_InternalContext_scalar_udf_xptr__impl(SEXP self__,
+                                                 SEXP c_arg__name) {
+  SEXP res = savvy_InternalContext_scalar_udf_xptr__ffi(self__, c_arg__name);
+  return handle_result(res);
+}
+
 SEXP savvy_InternalContext_sql__impl(SEXP self__, SEXP c_arg__query) {
   SEXP res = savvy_InternalContext_sql__ffi(self__, c_arg__query);
   return handle_result(res);
@@ -149,9 +170,10 @@ SEXP savvy_InternalDataFrame_to_arrow_schema__impl(SEXP 
self__,
   return handle_result(res);
 }
 
-SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__,
-                                                   SEXP c_arg__out) {
-  SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(self__, c_arg__out);
+SEXP savvy_InternalDataFrame_to_arrow_stream__impl(
+    SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr) {
+  SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(
+      self__, c_arg__out, c_arg__requested_schema_xptr);
   return handle_result(res);
 }
 
@@ -166,6 +188,11 @@ SEXP savvy_InternalDataFrame_to_parquet__impl(
   return handle_result(res);
 }
 
+SEXP savvy_InternalDataFrame_to_provider__impl(SEXP self__) {
+  SEXP res = savvy_InternalDataFrame_to_provider__ffi(self__);
+  return handle_result(res);
+}
+
 SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx,
                                            SEXP c_arg__table_ref,
                                            SEXP c_arg__overwrite) {
@@ -183,12 +210,18 @@ static const R_CallMethodDef CallEntries[] = {
      (DL_FUNC)&savvy_sedonadb_adbc_init_func__impl, 0},
     {"savvy_InternalContext_data_frame_from_array_stream__impl",
      (DL_FUNC)&savvy_InternalContext_data_frame_from_array_stream__impl, 3},
+    {"savvy_InternalContext_data_frame_from_table_provider__impl",
+     (DL_FUNC)&savvy_InternalContext_data_frame_from_table_provider__impl, 2},
     {"savvy_InternalContext_deregister_table__impl",
      (DL_FUNC)&savvy_InternalContext_deregister_table__impl, 2},
     {"savvy_InternalContext_new__impl",
      (DL_FUNC)&savvy_InternalContext_new__impl, 0},
     {"savvy_InternalContext_read_parquet__impl",
      (DL_FUNC)&savvy_InternalContext_read_parquet__impl, 2},
+    {"savvy_InternalContext_register_scalar_udf__impl",
+     (DL_FUNC)&savvy_InternalContext_register_scalar_udf__impl, 2},
+    {"savvy_InternalContext_scalar_udf_xptr__impl",
+     (DL_FUNC)&savvy_InternalContext_scalar_udf_xptr__impl, 2},
     {"savvy_InternalContext_sql__impl",
      (DL_FUNC)&savvy_InternalContext_sql__impl, 2},
     {"savvy_InternalContext_view__impl",
@@ -208,9 +241,11 @@ static const R_CallMethodDef CallEntries[] = {
     {"savvy_InternalDataFrame_to_arrow_schema__impl",
      (DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2},
     {"savvy_InternalDataFrame_to_arrow_stream__impl",
-     (DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2},
+     (DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 3},
     {"savvy_InternalDataFrame_to_parquet__impl",
      (DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8},
+    {"savvy_InternalDataFrame_to_provider__impl",
+     (DL_FUNC)&savvy_InternalDataFrame_to_provider__impl, 1},
     {"savvy_InternalDataFrame_to_view__impl",
      (DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4},
     {NULL, NULL, 0}};
diff --git a/r/sedonadb/src/rust/Cargo.toml b/r/sedonadb/src/rust/Cargo.toml
index 98dc548..ced8c24 100644
--- a/r/sedonadb/src/rust/Cargo.toml
+++ b/r/sedonadb/src/rust/Cargo.toml
@@ -29,6 +29,7 @@ arrow-array = { workspace = true }
 datafusion = { workspace = true }
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-ffi = { workspace = true }
 savvy = "*"
 savvy-ffi = "*"
 sedona = { path = "../../../../rust/sedona" }
diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h
index e3ba8a1..268c046 100644
--- a/r/sedonadb/src/rust/api.h
+++ b/r/sedonadb/src/rust/api.h
@@ -26,10 +26,15 @@ SEXP savvy_sedonadb_adbc_init_func__ffi(void);
 // methods and associated functions for InternalContext
 SEXP savvy_InternalContext_data_frame_from_array_stream__ffi(
     SEXP self__, SEXP c_arg__stream_xptr, SEXP c_arg__collect_now);
+SEXP savvy_InternalContext_data_frame_from_table_provider__ffi(
+    SEXP self__, SEXP c_arg__provider_xptr);
 SEXP savvy_InternalContext_deregister_table__ffi(SEXP self__,
                                                  SEXP c_arg__table_ref);
 SEXP savvy_InternalContext_new__ffi(void);
 SEXP savvy_InternalContext_read_parquet__ffi(SEXP self__, SEXP c_arg__paths);
+SEXP savvy_InternalContext_register_scalar_udf__ffi(
+    SEXP self__, SEXP c_arg__scalar_udf_xptr);
+SEXP savvy_InternalContext_scalar_udf_xptr__ffi(SEXP self__, SEXP c_arg__name);
 SEXP savvy_InternalContext_sql__ffi(SEXP self__, SEXP c_arg__query);
 SEXP savvy_InternalContext_view__ffi(SEXP self__, SEXP c_arg__table_ref);
 
@@ -43,11 +48,13 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP 
c_arg__ctx,
                                        SEXP c_arg__width_chars,
                                        SEXP c_arg__ascii, SEXP c_arg__limit);
 SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP 
c_arg__out);
-SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP 
c_arg__out);
+SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(
+    SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr);
 SEXP savvy_InternalDataFrame_to_parquet__ffi(
     SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
     SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
     SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
+SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__);
 SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
                                           SEXP c_arg__table_ref,
                                           SEXP c_arg__overwrite);
diff --git a/r/sedonadb/src/rust/src/context.rs 
b/r/sedonadb/src/rust/src/context.rs
index 67f52a3..a8c2c0e 100644
--- a/r/sedonadb/src/rust/src/context.rs
+++ b/r/sedonadb/src/rust/src/context.rs
@@ -16,13 +16,11 @@
 // under the License.
 use std::sync::Arc;
 
-use arrow_array::{
-    ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
-    RecordBatchReader,
-};
+use arrow_array::RecordBatchReader;
 use arrow_schema::ArrowError;
 use datafusion::catalog::{MemTable, TableProvider};
-use savvy::{savvy, savvy_err, Result};
+use datafusion_ffi::udf::FFI_ScalarUDF;
+use savvy::{savvy, savvy_err, IntoExtPtrSexp, Result};
 
 use sedona::{context::SedonaContext, 
record_batch_reader_provider::RecordBatchReaderProvider};
 use sedona_geoparquet::provider::GeoParquetReadOptions;
@@ -30,6 +28,7 @@ use tokio::runtime::Runtime;
 
 use crate::{
     dataframe::{new_data_frame, InternalDataFrame},
+    ffi::{import_array_stream, import_scalar_udf, import_table_provider, 
FFIScalarUdfR},
     runtime::wait_for_future_captured_r,
 };
 
@@ -94,14 +93,7 @@ impl InternalContext {
         stream_xptr: savvy::Sexp,
         collect_now: bool,
     ) -> savvy::Result<InternalDataFrame> {
-        let ffi_stream =
-            unsafe { savvy_ffi::R_ExternalPtrAddr(stream_xptr.0) as *mut 
FFI_ArrowArrayStream };
-        if ffi_stream.is_null() {
-            return Err(savvy_err!("external pointer to null in 
to_arrow_schema()"));
-        }
-
-        let stream = unsafe { FFI_ArrowArrayStream::from_raw(ffi_stream as _) 
};
-        let stream_reader = ArrowArrayStreamReader::try_new(stream)?;
+        let stream_reader = import_array_stream(stream_xptr)?;
 
         // Some readers are sensitive to being collected on the R thread or 
not, so
         // provide the option to collect everything immediately.
@@ -117,8 +109,37 @@ impl InternalContext {
         Ok(new_data_frame(inner, self.runtime.clone()))
     }
 
+    pub fn data_frame_from_table_provider(
+        &self,
+        provider_xptr: savvy::Sexp,
+    ) -> Result<InternalDataFrame> {
+        let provider = import_table_provider(provider_xptr)?;
+        let inner = self.inner.ctx.read_table(provider)?;
+        Ok(new_data_frame(inner, self.runtime.clone()))
+    }
+
     pub fn deregister_table(&self, table_ref: &str) -> savvy::Result<()> {
         self.inner.ctx.deregister_table(table_ref)?;
         Ok(())
     }
+
+    pub fn scalar_udf_xptr(&self, name: &str) -> savvy::Result<savvy::Sexp> {
+        if let Some(udf) = self.inner.ctx.state().scalar_functions().get(name) 
{
+            let ffi_scalar_udf: FFI_ScalarUDF = udf.clone().into();
+            let mut ffi_xptr = 
FFIScalarUdfR(ffi_scalar_udf).into_external_pointer();
+            unsafe { savvy_ffi::Rf_protect(ffi_xptr.0) };
+            ffi_xptr.set_class(vec!["datafusion_scalar_udf"])?;
+            unsafe { savvy_ffi::Rf_unprotect(1) };
+
+            Ok(ffi_xptr)
+        } else {
+            Err(savvy_err!("Scalar UDF '{name}' was not found"))
+        }
+    }
+
+    pub fn register_scalar_udf(&self, scalar_udf_xptr: savvy::Sexp) -> 
savvy::Result<()> {
+        let scalar_udf = import_scalar_udf(scalar_udf_xptr)?;
+        self.inner.ctx.register_udf(scalar_udf);
+        Ok(())
+    }
 }
diff --git a/r/sedonadb/src/rust/src/dataframe.rs 
b/r/sedonadb/src/rust/src/dataframe.rs
index 909b2d7..a28fddc 100644
--- a/r/sedonadb/src/rust/src/dataframe.rs
+++ b/r/sedonadb/src/rust/src/dataframe.rs
@@ -24,7 +24,8 @@ use datafusion::catalog::MemTable;
 use datafusion::{logical_expr::SortExpr, prelude::DataFrame};
 use datafusion_common::Column;
 use datafusion_expr::Expr;
-use savvy::{savvy, savvy_err, Result};
+use datafusion_ffi::table_provider::FFI_TableProvider;
+use savvy::{savvy, savvy_err, IntoExtPtrSexp, Result};
 use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
 use sedona::reader::SedonaStreamReader;
 use sedona::show::{DisplayMode, DisplayTableOptions};
@@ -33,6 +34,7 @@ use sedona_schema::schema::SedonaSchema;
 use tokio::runtime::Runtime;
 
 use crate::context::InternalContext;
+use crate::ffi::{import_schema, FFITableProviderR};
 use crate::runtime::wait_for_future_captured_r;
 
 #[savvy]
@@ -86,12 +88,22 @@ impl InternalDataFrame {
         Ok(())
     }
 
-    fn to_arrow_stream(&self, out: savvy::Sexp) -> Result<()> {
+    fn to_arrow_stream(&self, out: savvy::Sexp, requested_schema_xptr: 
savvy::Sexp) -> Result<()> {
         let out_void = unsafe { savvy_ffi::R_ExternalPtrAddr(out.0) };
         if out_void.is_null() {
             return Err(savvy_err!("external pointer to null in 
to_arrow_stream()"));
         }
 
+        let maybe_requested_schema = if requested_schema_xptr.is_null() {
+            None
+        } else {
+            Some(import_schema(requested_schema_xptr))
+        };
+
+        if maybe_requested_schema.is_some() {
+            return Err(savvy_err!("Requested schema is not supported"));
+        }
+
         let inner = self.inner.clone();
         let stream =
             wait_for_future_captured_r(
@@ -109,6 +121,21 @@ impl InternalDataFrame {
         Ok(())
     }
 
+    fn to_provider(&self) -> Result<savvy::Sexp> {
+        let provider = self.inner.clone().into_view();
+        // Literal true is because the TableProvider that wraps this DataFrame
+        // can support filters being pushed down.
+        let ffi_provider =
+            FFI_TableProvider::new(provider, true, 
Some(self.runtime.handle().clone()));
+
+        let mut ffi_xptr = 
FFITableProviderR(ffi_provider).into_external_pointer();
+        unsafe { savvy_ffi::Rf_protect(ffi_xptr.0) };
+        ffi_xptr.set_class(vec!["datafusion_table_provider"])?;
+        unsafe { savvy_ffi::Rf_unprotect(1) };
+
+        Ok(ffi_xptr)
+    }
+
     fn compute(&self, ctx: &InternalContext) -> Result<InternalDataFrame> {
         let schema = self.inner.schema();
         let batches =
diff --git a/r/sedonadb/src/rust/src/ffi.rs b/r/sedonadb/src/rust/src/ffi.rs
new file mode 100644
index 0000000..4275e26
--- /dev/null
+++ b/r/sedonadb/src/rust/src/ffi.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{
+    ffi::FFI_ArrowSchema,
+    ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
+};
+use arrow_schema::Schema;
+use datafusion::catalog::TableProvider;
+use datafusion_expr::ScalarUDF;
+use datafusion_ffi::{
+    table_provider::{FFI_TableProvider, ForeignTableProvider},
+    udf::{FFI_ScalarUDF, ForeignScalarUDF},
+};
+use savvy::{savvy_err, IntoExtPtrSexp};
+
+pub fn import_schema(mut xptr: savvy::Sexp) -> savvy::Result<Schema> {
+    let ffi_schema: &FFI_ArrowSchema = import_xptr(&mut xptr, 
"nanoarrow_schema")?;
+    let schema = Schema::try_from(ffi_schema)?;
+    Ok(schema)
+}
+
+pub fn import_array_stream(mut xptr: savvy::Sexp) -> 
savvy::Result<ArrowArrayStreamReader> {
+    let ffi_stream: &mut FFI_ArrowArrayStream = import_xptr(&mut xptr, 
"nanoarrow_array_stream")?;
+    let reader = unsafe { ArrowArrayStreamReader::from_raw(ffi_stream as _)? };
+    Ok(reader)
+}
+
+pub fn import_table_provider(
+    mut provider_xptr: savvy::Sexp,
+) -> savvy::Result<Arc<dyn TableProvider>> {
+    let ffi_provider: &FFI_TableProvider =
+        import_xptr(&mut provider_xptr, "datafusion_table_provider")?;
+    let provider_impl = ForeignTableProvider::from(ffi_provider);
+    Ok(Arc::new(provider_impl))
+}
+
+pub fn import_scalar_udf(mut scalar_udf_xptr: savvy::Sexp) -> 
savvy::Result<ScalarUDF> {
+    let ffi_scalar_udf_ref: &FFI_ScalarUDF =
+        import_xptr(&mut scalar_udf_xptr, "datafusion_scalar_udf")?;
+    let scalar_udf_impl = ForeignScalarUDF::try_from(ffi_scalar_udf_ref)?;
+    Ok(scalar_udf_impl.into())
+}
+
+fn import_xptr<'a, T>(xptr: &'a mut savvy::Sexp, cls: &str) -> 
savvy::Result<&'a mut T> {
+    if !xptr.is_external_pointer() {
+        return Err(savvy_err!(
+            "Expected external pointer with class {cls} but got a different R 
object"
+        ));
+    }
+
+    if !xptr
+        .get_class()
+        .map(|classes| classes.contains(&cls))
+        .unwrap_or(false)
+    {
+        return Err(savvy_err!(
+            "Expected external pointer of class {cls} but got external pointer 
with classes {:?}",
+            xptr.get_class()
+        ));
+    }
+
+    let typed_ptr = unsafe { savvy_ffi::R_ExternalPtrAddr(xptr.0) as *mut T };
+    if let Some(type_ref) = unsafe { typed_ptr.as_mut() } {
+        Ok(type_ref)
+    } else {
+        Err(savvy_err!("external pointer with class {cls} is null"))
+    }
+}
+
+#[repr(C)]
+pub struct FFIScalarUdfR(pub FFI_ScalarUDF);
+impl IntoExtPtrSexp for FFIScalarUdfR {}
+
+#[repr(C)]
+pub struct FFITableProviderR(pub FFI_TableProvider);
+impl IntoExtPtrSexp for FFITableProviderR {}
diff --git a/r/sedonadb/src/rust/src/lib.rs b/r/sedonadb/src/rust/src/lib.rs
index 727c36b..07c6f31 100644
--- a/r/sedonadb/src/rust/src/lib.rs
+++ b/r/sedonadb/src/rust/src/lib.rs
@@ -27,6 +27,7 @@ use sedona_proj::register::{configure_global_proj_engine, 
ProjCrsEngineBuilder};
 mod context;
 mod dataframe;
 mod error;
+mod ffi;
 mod runtime;
 
 #[savvy]
diff --git a/r/sedonadb/tests/testthat/test-context.R 
b/r/sedonadb/tests/testthat/test-context.R
index d0e1554..3050e09 100644
--- a/r/sedonadb/tests/testthat/test-context.R
+++ b/r/sedonadb/tests/testthat/test-context.R
@@ -40,6 +40,18 @@ test_that("views can be created and dropped", {
   expect_error(sd_view("foofy"), "No table named 'foofy'")
 })
 
+test_that("scalar udfs can be registered", {
+  udf <- ctx()$scalar_udf_xptr("st_point")
+  expect_s3_class(udf, "datafusion_scalar_udf")
+
+  sd_register_udf(udf)
+  df <- sd_sql("SELECT ST_Point(0, 1) as geom") |> sd_collect()
+  expect_identical(
+    wk::as_wkt(df$geom),
+    wk::wkt("POINT (0 1)")
+  )
+})
+
 test_that("configure_proj() errors for invalid inputs", {
   expect_error(
     sd_configure_proj("not a preset"),
diff --git a/r/sedonadb/tests/testthat/test-dataframe.R 
b/r/sedonadb/tests/testthat/test-dataframe.R
index a74eab3..0bc3c4c 100644
--- a/r/sedonadb/tests/testthat/test-dataframe.R
+++ b/r/sedonadb/tests/testthat/test-dataframe.R
@@ -53,6 +53,16 @@ test_that("dataframe can be created from nanoarrow objects", 
{
   expect_identical(sd_collect(df, ptype = r_df), r_df)
 })
 
+test_that("dataframe can be created from an FFI table provider", {
+  df <- as_sedonadb_dataframe(data.frame(one = 1, two = "two"))
+  provider <- df$df$to_provider()
+  df2 <- as_sedonadb_dataframe(provider)
+  expect_identical(
+    sd_collect(df2),
+    data.frame(one = 1, two = "two")
+  )
+})
+
 test_that("dataframe property accessors work", {
   df <- sd_sql("SELECT ST_Point(0, 1) as pt")
   expect_identical(ncol(df), 1L)
@@ -108,6 +118,11 @@ test_that("dataframe can be converted to an array stream", 
{
     as.data.frame(stream),
     data.frame(one = 1, two = "two")
   )
+
+  expect_error(
+    nanoarrow::as_nanoarrow_array_stream(df, schema = nanoarrow::na_int32()),
+    "Requested schema is not supported"
+  )
 })
 
 test_that("dataframe can be printed", {

Reply via email to