This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 396d8517 feat(r): Add bindings to IPC writer (#608)
396d8517 is described below

commit 396d8517d9efdc7b5af04e2850e174888955ba4c
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Sep 17 09:38:22 2024 -0500

    feat(r): Add bindings to IPC writer (#608)
    
    This PR adds a basic level of support for IPC writing in the R package.
    This is basically a thin wrapper around `ArrowIpcWriterWriteStream()`
    and could be more feature-rich like the Python version (that allows
    schemas and batches to be written individually).
    
    I also added a bit of code to handle interrupts (which should catch
    interrupts on read and write and wasn't handled before).
    
    ``` r
    library(nanoarrow)
    tf <- tempfile()
    
    nycflights13::flights |> write_nanoarrow(tf)
    (df <- tf |> read_nanoarrow() |> tibble::as_tibble())
    #> # A tibble: 336,776 × 19
    #>     year month   day dep_time sched_dep_time dep_delay arr_time 
sched_arr_time
    #>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          
<int>
    #>  1  2013     1     1      517            515         2      830          
  819
    #>  2  2013     1     1      533            529         4      850          
  830
    #>  3  2013     1     1      542            540         2      923          
  850
    #>  4  2013     1     1      544            545        -1     1004          
 1022
    #>  5  2013     1     1      554            600        -6      812          
  837
    #>  6  2013     1     1      554            558        -4      740          
  728
    #>  7  2013     1     1      555            600        -5      913          
  854
    #>  8  2013     1     1      557            600        -3      709          
  723
    #>  9  2013     1     1      557            600        -3      838          
  846
    #> 10  2013     1     1      558            600        -2      753          
  745
    #> # ℹ 336,766 more rows
    #> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
    #> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance 
<dbl>,
    #> #   hour <dbl>, minute <dbl>, time_hour <dttm>
    identical(df, nycflights13::flights)
    #> [1] TRUE
    ```
    
    <sup>Created on 2024-09-14 with [reprex
    v2.1.1](https://reprex.tidyverse.org)</sup>
---
 r/DESCRIPTION               |   2 +-
 r/NAMESPACE                 |   3 +
 r/R/ipc.R                   |  72 +++++++++++++++++++--
 r/man/read_nanoarrow.Rd     |  18 ++++--
 r/src/init.c                |   5 ++
 r/src/ipc.c                 | 149 ++++++++++++++++++++++++++++++++++++++++++--
 r/src/util.c                |   7 ++-
 r/src/util.h                |   1 +
 r/tests/testthat/test-ipc.R |  95 +++++++++++++++++++++++++++-
 9 files changed, 331 insertions(+), 21 deletions(-)

diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index d20eec34..addb1381 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -19,7 +19,7 @@ Description: Provides an 'R' interface to the 'nanoarrow' 'C' 
library and the
 License: Apache License (>= 2)
 Encoding: UTF-8
 Roxygen: list(markdown = TRUE)
-RoxygenNote: 7.2.3
+RoxygenNote: 7.3.2
 URL: https://arrow.apache.org/nanoarrow/latest/r/, 
https://github.com/apache/arrow-nanoarrow
 BugReports: https://github.com/apache/arrow-nanoarrow/issues
 Suggests:
diff --git a/r/NAMESPACE b/r/NAMESPACE
index aa6e71a0..d9fa3303 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -127,6 +127,8 @@ S3method(str,nanoarrow_array_stream)
 S3method(str,nanoarrow_buffer)
 S3method(str,nanoarrow_schema)
 S3method(str,nanoarrow_vctr)
+S3method(write_nanoarrow,character)
+S3method(write_nanoarrow,connection)
 export(array_stream_set_finalizer)
 export(as_nanoarrow_array)
 export(as_nanoarrow_array_extension)
@@ -210,6 +212,7 @@ export(read_nanoarrow)
 export(register_nanoarrow_extension)
 export(resolve_nanoarrow_extension)
 export(unregister_nanoarrow_extension)
+export(write_nanoarrow)
 importFrom(utils,getFromNamespace)
 importFrom(utils,str)
 useDynLib(nanoarrow, .registration = TRUE)
diff --git a/r/R/ipc.R b/r/R/ipc.R
index 29471b0f..14f49f92 100644
--- a/r/R/ipc.R
+++ b/r/R/ipc.R
@@ -15,17 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-#' Read serialized streams of Arrow data
+#' Read/write serialized streams of Arrow data
 #'
-#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow
-#' data. Arrow documentation typically refers to this format as "Arrow IPC",
-#' since its origin was as a means to transmit tables between processes
+#' Reads/writes connections, file paths, URLs, or raw vectors from/to 
serialized
+#' Arrow data. Arrow documentation typically refers to this format as "Arrow
+#' IPC", since its origin was as a means to transmit tables between processes
 #' (e.g., multiple R sessions). This format can also be written to and read
 #' from files or URLs and is essentially a high performance equivalent of
 #' a CSV file that does a better job maintaining types.
 #'
-#' The nanoarrow package does not currently have the ability to write 
serialized
-#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use
+#' The nanoarrow package implements an IPC writer; however, you can also
+#' use [arrow::write_ipc_stream()] to write data from R, or use
 #' the equivalent writer from another Arrow implementation in Python, C++,
 #' Rust, JavaScript, Julia, C#, and beyond.
 #'
@@ -35,6 +35,8 @@
 #' @param x A `raw()` vector, connection, or file path from which to read
 #'   binary data. Common extensions indicating compression (.gz, .bz2, .zip)
 #'   are automatically uncompressed.
+#' @param data An object to write as an Arrow IPC stream, converted using
+#'   [as_nanoarrow_array_stream()]. Notably, this includes a [data.frame()].
 #' @param lazy By default, `read_nanoarrow()` will read and discard a copy of
 #'   the reader's schema to ensure that invalid streams are discovered as
 #'   soon as possible. Use `lazy = TRUE` to defer this check until the reader
@@ -107,6 +109,42 @@ read_nanoarrow.connection <- function(x, ..., lazy = 
FALSE) {
   check_stream_if_requested(reader, lazy)
 }
 
+#' @rdname read_nanoarrow
+#' @export
+write_nanoarrow <- function(data, x, ...) {
+  UseMethod("write_nanoarrow", x)
+}
+
+#' @export
+write_nanoarrow.connection <- function(data, x, ...) {
+  if (!isOpen(x)) {
+    open(x, "wb")
+    on.exit(close(x))
+  }
+
+  writer <- .Call(nanoarrow_c_ipc_writer_connection, x)
+  stream <- as_nanoarrow_array_stream(data)
+  on.exit(nanoarrow_pointer_release(stream), add = TRUE)
+
+  .Call(nanoarrow_c_ipc_writer_write_stream, writer, stream)
+  invisible(data)
+}
+
+#' @export
+write_nanoarrow.character <- function(data, x, ...) {
+  if (length(x) != 1) {
+    stop(sprintf("Can't interpret character(%d) as file path", length(x)))
+  }
+
+  con_type <- guess_connection_type(x)
+  if (con_type == "unz") {
+    stop("zip compression not supported for write_nanoarrow()")
+  }
+
+  con <- do.call(con_type, list(x))
+  write_nanoarrow(data, con)
+}
+
 #' @rdname read_nanoarrow
 #' @export
 example_ipc_stream <- function() {
@@ -205,3 +243,25 @@ guess_zip_filename <- function(x) {
 
   files
 }
+
+# The C-level R_tryCatch() does not provide for handling interrupts (or
+# I couldn't figure out how to make it work), so instead we provide wrappers
+# around readBin() and writeBin() that convert interrupt conditions to errors
+# (which the C code does know how to handle).
+read_bin_wrapper <- function(con, what, n) {
+  withCallingHandlers(
+    readBin(con, what, n),
+    interrupt = function(e) {
+      stop("user interrupt")
+    }
+  )
+}
+
+write_bin_wrapper <- function(object, con) {
+  withCallingHandlers(
+    writeBin(object, con),
+    interrupt = function(e) {
+      stop("user interrupt")
+    }
+  )
+}
diff --git a/r/man/read_nanoarrow.Rd b/r/man/read_nanoarrow.Rd
index 87d300c8..cf4d423e 100644
--- a/r/man/read_nanoarrow.Rd
+++ b/r/man/read_nanoarrow.Rd
@@ -2,11 +2,14 @@
 % Please edit documentation in R/ipc.R
 \name{read_nanoarrow}
 \alias{read_nanoarrow}
+\alias{write_nanoarrow}
 \alias{example_ipc_stream}
-\title{Read serialized streams of Arrow data}
+\title{Read/write serialized streams of Arrow data}
 \usage{
 read_nanoarrow(x, ..., lazy = FALSE)
 
+write_nanoarrow(data, x, ...)
+
 example_ipc_stream()
 }
 \arguments{
@@ -20,21 +23,24 @@ are automatically uncompressed.}
 the reader's schema to ensure that invalid streams are discovered as
 soon as possible. Use \code{lazy = TRUE} to defer this check until the reader
 is actually consumed.}
+
+\item{data}{An object to write as an Arrow IPC stream, converted using
+\code{\link[=as_nanoarrow_array_stream]{as_nanoarrow_array_stream()}}. 
Notably, this includes a \code{\link[=data.frame]{data.frame()}}.}
 }
 \value{
 A \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
 }
 \description{
-Reads connections, file paths, URLs, or raw vectors of serialized Arrow
-data. Arrow documentation typically refers to this format as "Arrow IPC",
-since its origin was as a means to transmit tables between processes
+Reads/writes connections, file paths, URLs, or raw vectors from/to serialized
+Arrow data. Arrow documentation typically refers to this format as "Arrow
+IPC", since its origin was as a means to transmit tables between processes
 (e.g., multiple R sessions). This format can also be written to and read
 from files or URLs and is essentially a high performance equivalent of
 a CSV file that does a better job maintaining types.
 }
 \details{
-The nanoarrow package does not currently have the ability to write serialized
-IPC data: use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} 
to write data from R, or use
+The nanoarrow package implements an IPC writer; however, you can also
+use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} to write 
data from R, or use
 the equivalent writer from another Arrow implementation in Python, C++,
 Rust, JavaScript, Julia, C#, and beyond.
 
diff --git a/r/src/init.c b/r/src/init.c
index 69c94391..58fe9f83 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -58,6 +58,8 @@ extern SEXP nanoarrow_c_infer_ptype(SEXP schema_xptr);
 extern SEXP nanoarrow_c_convert_array(SEXP array_xptr, SEXP ptype_sexp);
 extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr);
 extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con);
+extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con);
+extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP 
array_stream_xptr);
 extern SEXP nanoarrow_c_allocate_schema(void);
 extern SEXP nanoarrow_c_allocate_array(void);
 extern SEXP nanoarrow_c_allocate_array_stream(void);
@@ -136,6 +138,9 @@ static const R_CallMethodDef CallEntries[] = {
      1},
     {"nanoarrow_c_ipc_array_reader_connection",
      (DL_FUNC)&nanoarrow_c_ipc_array_reader_connection, 1},
+    {"nanoarrow_c_ipc_writer_connection", 
(DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1},
+    {"nanoarrow_c_ipc_writer_write_stream", 
(DL_FUNC)&nanoarrow_c_ipc_writer_write_stream,
+     2},
     {"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0},
     {"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0},
     {"nanoarrow_c_allocate_array_stream", 
(DL_FUNC)&nanoarrow_c_allocate_array_stream, 0},
diff --git a/r/src/ipc.c b/r/src/ipc.c
index 3039c7e6..396a25fc 100644
--- a/r/src/ipc.c
+++ b/r/src/ipc.c
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <stdint.h>
 #define R_NO_REMAP
 #include <R.h>
 #include <Rinternals.h>
@@ -48,6 +49,50 @@ static SEXP input_stream_owning_xptr(void) {
   return input_stream_xptr;
 }
 
+static void finalize_output_stream_xptr(SEXP output_stream_xptr) {
+  struct ArrowIpcOutputStream* output_stream =
+      (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);
+  if (output_stream != NULL && output_stream->release != NULL) {
+    output_stream->release(output_stream);
+  }
+
+  if (output_stream != NULL) {
+    ArrowFree(output_stream);
+  }
+}
+
+static SEXP output_stream_owning_xptr(void) {
+  struct ArrowIpcOutputStream* output_stream =
+      (struct ArrowIpcOutputStream*)ArrowMalloc(sizeof(struct 
ArrowIpcOutputStream));
+  output_stream->release = NULL;
+  SEXP output_stream_xptr =
+      PROTECT(R_MakeExternalPtr(output_stream, R_NilValue, R_NilValue));
+  R_RegisterCFinalizer(output_stream_xptr, &finalize_output_stream_xptr);
+  UNPROTECT(1);
+  return output_stream_xptr;
+}
+
+static void finalize_writer_xptr(SEXP writer_xptr) {
+  struct ArrowIpcWriter* writer = (struct 
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+  if (writer != NULL && writer->private_data != NULL) {
+    ArrowIpcWriterReset(writer);
+  }
+
+  if (writer != NULL) {
+    ArrowFree(writer);
+  }
+}
+
+static SEXP writer_owning_xptr(void) {
+  struct ArrowIpcWriter* writer =
+      (struct ArrowIpcWriter*)ArrowMalloc(sizeof(struct ArrowIpcWriter));
+  writer->private_data = NULL;
+  SEXP writer_xptr = PROTECT(R_MakeExternalPtr(writer, R_NilValue, 
R_NilValue));
+  R_RegisterCFinalizer(writer_xptr, &finalize_writer_xptr);
+  UNPROTECT(1);
+  return writer_xptr;
+}
+
 SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) {
   struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr);
 
@@ -82,7 +127,7 @@ struct ConnectionInputStreamHandler {
   int return_code;
 };
 
-static SEXP handle_readbin_error(SEXP cond, void* hdata) {
+static SEXP handle_readbin_writebin_error(SEXP cond, void* hdata) {
   struct ConnectionInputStreamHandler* data = (struct 
ConnectionInputStreamHandler*)hdata;
 
   SEXP fun = PROTECT(Rf_install("conditionMessage"));
@@ -103,7 +148,7 @@ static SEXP call_readbin(void* hdata) {
   SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes));
   SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, 
nanoarrow_ptype_raw, n));
 
-  SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
+  SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg));
   R_xlen_t bytes_read = Rf_xlength(result);
   memcpy(data->buf, RAW(result), bytes_read);
   *(data->size_read_out) = bytes_read;
@@ -112,6 +157,36 @@ static SEXP call_readbin(void* hdata) {
   return R_NilValue;
 }
 
+static SEXP call_writebin(void* hdata) {
+  struct ConnectionInputStreamHandler* data = (struct 
ConnectionInputStreamHandler*)hdata;
+
+  // Write 16MB chunks. This a balance between being small enough not to
+  // copy too much of the source unnecessarily and big enough to avoid
+  // unnecessary R evaluation overhead.
+  int64_t chunk_buffer_size = 16777216;
+  SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size));
+  SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, 
data->con));
+  while (data->buf_size_bytes > chunk_buffer_size) {
+    memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size);
+    Rf_eval(call, nanoarrow_ns_pkg);
+    data->buf_size_bytes -= chunk_buffer_size;
+    data->buf += chunk_buffer_size;
+  }
+
+  UNPROTECT(2);
+
+  // Write remaining bytes
+  if (data->buf_size_bytes > 0) {
+    chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes));
+    call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
+    memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes);
+    Rf_eval(call, nanoarrow_ns_pkg);
+    UNPROTECT(2);
+  }
+
+  return R_NilValue;
+}
+
 static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream,
                                             uint8_t* buf, int64_t 
buf_size_bytes,
                                             int64_t* size_read_out,
@@ -129,7 +204,32 @@ static ArrowErrorCode read_con_input_stream(struct 
ArrowIpcInputStream* stream,
   data.error = error;
   data.return_code = NANOARROW_OK;
 
-  R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data);
+  R_tryCatchError(&call_readbin, &data, &handle_readbin_writebin_error, &data);
+  return data.return_code;
+}
+
+static ArrowErrorCode write_con_output_stream(struct ArrowIpcOutputStream* 
stream,
+                                              const void* buf, int64_t 
buf_size_bytes,
+                                              int64_t* size_write_out,
+                                              struct ArrowError* error) {
+  if (!nanoarrow_is_main_thread()) {
+    ArrowErrorSet(error, "Can't read from R connection on a non-R thread");
+    return EIO;
+  }
+
+  struct ConnectionInputStreamHandler data;
+  data.con = (SEXP)stream->private_data;
+  data.buf = (void*)buf;
+  data.buf_size_bytes = buf_size_bytes;
+  data.size_read_out = NULL;
+  data.error = error;
+  data.return_code = NANOARROW_OK;
+
+  R_tryCatchError(&call_writebin, &data, &handle_readbin_writebin_error, 
&data);
+
+  // This implementation always blocks until all bytes have been written
+  *size_write_out = buf_size_bytes;
+
   return data.return_code;
 }
 
@@ -137,6 +237,10 @@ static void release_con_input_stream(struct 
ArrowIpcInputStream* stream) {
   nanoarrow_release_sexp((SEXP)stream->private_data);
 }
 
+static void release_con_output_stream(struct ArrowIpcOutputStream* stream) {
+  nanoarrow_release_sexp((SEXP)stream->private_data);
+}
+
 SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
   SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
   struct ArrowArrayStream* array_stream =
@@ -153,9 +257,46 @@ SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
 
   int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL);
   if (code != NANOARROW_OK) {
-    Rf_error("ArrowIpcArrayStreamReaderInit() failed");
+    Rf_error("ArrowIpcArrayStreamReaderInit() failed with errno %d", code);
   }
 
   UNPROTECT(2);
   return array_stream_xptr;
 }
+
+SEXP nanoarrow_c_ipc_writer_connection(SEXP con) {
+  SEXP output_stream_xptr = PROTECT(output_stream_owning_xptr());
+  struct ArrowIpcOutputStream* output_stream =
+      (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);
+
+  output_stream->write = &write_con_output_stream;
+  output_stream->release = &release_con_output_stream;
+  output_stream->private_data = (SEXP)con;
+  nanoarrow_preserve_sexp(con);
+
+  SEXP writer_xptr = PROTECT(writer_owning_xptr());
+  struct ArrowIpcWriter* writer = (struct 
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+
+  int code = ArrowIpcWriterInit(writer, output_stream);
+  if (code != NANOARROW_OK) {
+    Rf_error("ArrowIpcWriterInit() failed with errno %d", code);
+  }
+
+  UNPROTECT(2);
+  return writer_xptr;
+}
+
+SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP 
array_stream_xptr) {
+  struct ArrowIpcWriter* writer = (struct 
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+  struct ArrowArrayStream* array_stream =
+      nanoarrow_array_stream_from_xptr(array_stream_xptr);
+
+  struct ArrowError error;
+  ArrowErrorInit(&error);
+  int code = ArrowIpcWriterWriteArrayStream(writer, array_stream, &error);
+  if (code != NANOARROW_OK) {
+    Rf_error("ArrowIpcWriterWriteArrayStream() failed: %s", error.message);
+  }
+
+  return R_NilValue;
+}
diff --git a/r/src/util.c b/r/src/util.c
index 6d4035ba..56d9d05a 100644
--- a/r/src/util.c
+++ b/r/src/util.c
@@ -30,6 +30,7 @@ SEXP nanoarrow_cls_schema = NULL;
 SEXP nanoarrow_cls_array_stream = NULL;
 SEXP nanoarrow_cls_buffer = NULL;
 SEXP nanoarrow_sym_readbin = NULL;
+SEXP nanoarrow_sym_writebin = NULL;
 SEXP nanoarrow_ptype_raw = NULL;
 
 void nanoarrow_init_cached_sexps(void) {
@@ -42,7 +43,8 @@ void nanoarrow_init_cached_sexps(void) {
   nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema"));
   nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream"));
   nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer"));
-  nanoarrow_sym_readbin = PROTECT(Rf_install("readBin"));
+  nanoarrow_sym_readbin = PROTECT(Rf_install("read_bin_wrapper"));
+  nanoarrow_sym_writebin = PROTECT(Rf_install("write_bin_wrapper"));
   nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0));
 
   R_PreserveObject(nanoarrow_ns_pkg);
@@ -54,9 +56,10 @@ void nanoarrow_init_cached_sexps(void) {
   R_PreserveObject(nanoarrow_cls_array_stream);
   R_PreserveObject(nanoarrow_cls_buffer);
   R_PreserveObject(nanoarrow_sym_readbin);
+  R_PreserveObject(nanoarrow_sym_writebin);
   R_PreserveObject(nanoarrow_ptype_raw);
 
-  UNPROTECT(11);
+  UNPROTECT(12);
 }
 
 SEXP nanoarrow_c_preserved_count(void) {
diff --git a/r/src/util.h b/r/src/util.h
index d652330e..23a9f2ed 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -32,6 +32,7 @@ extern SEXP nanoarrow_cls_schema;
 extern SEXP nanoarrow_cls_array_stream;
 extern SEXP nanoarrow_cls_buffer;
 extern SEXP nanoarrow_sym_readbin;
+extern SEXP nanoarrow_sym_writebin;
 extern SEXP nanoarrow_ptype_raw;
 
 void nanoarrow_init_cached_sexps(void);
diff --git a/r/tests/testthat/test-ipc.R b/r/tests/testthat/test-ipc.R
index 85adf0f0..3944de4b 100644
--- a/r/tests/testthat/test-ipc.R
+++ b/r/tests/testthat/test-ipc.R
@@ -36,6 +36,20 @@ test_that("read_nanoarrow() works for open connections", {
   )
 })
 
+test_that("write_nanoarrow() works for open connections", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+
+  con <- rawConnection(raw(), "wb")
+  on.exit(close(con))
+
+  write_nanoarrow(data.frame(), con)
+  expect_identical(
+    as.data.frame(read_nanoarrow(rawConnectionValue(con))),
+    data.frame()
+  )
+})
+
 test_that("read_nanoarrow() works for unopened connections", {
   tf <- tempfile()
   on.exit(unlink(tf))
@@ -56,6 +70,20 @@ test_that("read_nanoarrow() works for unopened connections", 
{
   )
 })
 
+test_that("write_nanoarrow() works for unopened connections", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+
+  con <- file(tf)
+  # Don't close on exit, because we're supposed to do that
+
+  write_nanoarrow(data.frame(), con)
+  expect_error(
+    close(con),
+    "invalid connection"
+  )
+})
+
 test_that("read_nanoarrow() works for file paths", {
   tf <- tempfile()
   on.exit(unlink(tf))
@@ -71,6 +99,15 @@ test_that("read_nanoarrow() works for file paths", {
   )
 })
 
+test_that("write_nanoarrow() works for file paths", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+
+  df <- data.frame(letters = letters)
+  expect_identical(write_nanoarrow(df, tf), df)
+  expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
 test_that("read_nanoarrow() works for URLs", {
   tf <- tempfile()
   on.exit(unlink(tf))
@@ -86,6 +123,15 @@ test_that("read_nanoarrow() works for URLs", {
   )
 })
 
+test_that("write_nanoarrow() works for URLs", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+
+  df <- data.frame(letters = letters)
+  expect_identical(write_nanoarrow(df, paste0("file://", tf)), df)
+  expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
 test_that("read_nanoarrow() works for compressed .gz file paths", {
   tf <- tempfile(fileext = ".gz")
   on.exit(unlink(tf))
@@ -101,6 +147,15 @@ test_that("read_nanoarrow() works for compressed .gz file 
paths", {
   )
 })
 
+test_that("write_nanoarrow() works for compressed .gz file paths", {
+  tf <- tempfile(fileext = ".gz")
+  on.exit(unlink(tf))
+
+  df <- data.frame(letters = letters)
+  expect_identical(write_nanoarrow(df, tf), df)
+  expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
 test_that("read_nanoarrow() works for compressed .bz2 file paths", {
   tf <- tempfile(fileext = ".bz2")
   on.exit(unlink(tf))
@@ -116,6 +171,15 @@ test_that("read_nanoarrow() works for compressed .bz2 file 
paths", {
   )
 })
 
+test_that("write_nanoarrow() works for compressed .bz2 file paths", {
+  tf <- tempfile(fileext = ".bz2")
+  on.exit(unlink(tf))
+
+  df <- data.frame(letters = letters)
+  expect_identical(write_nanoarrow(df, tf), df)
+  expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
 test_that("read_nanoarrow() works for compressed .zip file paths", {
   tf <- tempfile(fileext = ".zip")
   tdir <- tempfile()
@@ -144,6 +208,17 @@ test_that("read_nanoarrow() works for compressed .zip file 
paths", {
   )
 })
 
+test_that("write_nanoarrow() errors for compressed .zip file paths", {
+  tf <- tempfile(fileext = ".zip")
+  on.exit(unlink(tf))
+
+  df <- data.frame(letters = letters)
+  expect_error(
+    write_nanoarrow(df, tf),
+    "zip compression not supported"
+  )
+})
+
 test_that("read_nanoarrow() errors for compressed URL paths", {
   expect_error(
     read_nanoarrow("https://something.zip";),
@@ -151,11 +226,15 @@ test_that("read_nanoarrow() errors for compressed URL 
paths", {
   )
 })
 
-test_that("read_nanoarrow() errors for input with length != 1", {
+test_that("read|write_nanoarrow() errors for input with length != 1", {
   expect_error(
     read_nanoarrow(character(0)),
     "Can't interpret character"
   )
+  expect_error(
+    write_nanoarrow(data.frame(), character(0)),
+    "Can't interpret character"
+  )
 })
 
 test_that("read_nanoarrow() errors zip archives that contain files != 1", {
@@ -187,7 +266,7 @@ test_that("read_nanoarrow() reports errors from readBin", {
   writeLines("this is not a binary file", tf)
 
   con <- file(tf, open = "r")
-  on.exit(close(con))
+  on.exit(close(con), add = TRUE)
 
   expect_error(
     read_nanoarrow(con),
@@ -195,6 +274,18 @@ test_that("read_nanoarrow() reports errors from readBin", {
   )
 })
 
+test_that("write_nanoarrow() reports errors from writeBin", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+  con <- file(tf, open = "w")
+  on.exit(close(con), add = TRUE)
+
+  expect_error(
+    write_nanoarrow(data.frame(), con),
+    "R execution error"
+  )
+})
+
 test_that("read_nanoarrow() respects lazy argument", {
   expect_error(
     read_nanoarrow(raw(0), lazy = FALSE),

Reply via email to