This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 396d8517 feat(r): Add bindings to IPC writer (#608)
396d8517 is described below
commit 396d8517d9efdc7b5af04e2850e174888955ba4c
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Sep 17 09:38:22 2024 -0500
feat(r): Add bindings to IPC writer (#608)
This PR adds a basic level of support for IPC writing in the R package.
This is basically a thin wrapper around `ArrowIpcWriterWriteStream()`
and could be more feature-rich like the Python version (that allows
schemas and batches to be written individually).
I also added a bit of code to handle interrupts (which should catch
interrupts on read and write and wasn't handled before).
``` r
library(nanoarrow)
tf <- tempfile()
nycflights13::flights |> write_nanoarrow(tf)
(df <- tf |> read_nanoarrow() |> tibble::as_tibble())
#> # A tibble: 336,776 × 19
#> year month day dep_time sched_dep_time dep_delay arr_time
sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int>
<int>
#> 1 2013 1 1 517 515 2 830
819
#> 2 2013 1 1 533 529 4 850
830
#> 3 2013 1 1 542 540 2 923
850
#> 4 2013 1 1 544 545 -1 1004
1022
#> 5 2013 1 1 554 600 -6 812
837
#> 6 2013 1 1 554 558 -4 740
728
#> 7 2013 1 1 555 600 -5 913
854
#> 8 2013 1 1 557 600 -3 709
723
#> 9 2013 1 1 557 600 -3 838
846
#> 10 2013 1 1 558 600 -2 753
745
#> # ℹ 336,766 more rows
#> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> # tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance
<dbl>,
#> # hour <dbl>, minute <dbl>, time_hour <dttm>
identical(df, nycflights13::flights)
#> [1] TRUE
```
<sup>Created on 2024-09-14 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
---
r/DESCRIPTION | 2 +-
r/NAMESPACE | 3 +
r/R/ipc.R | 72 +++++++++++++++++++--
r/man/read_nanoarrow.Rd | 18 ++++--
r/src/init.c | 5 ++
r/src/ipc.c | 149 ++++++++++++++++++++++++++++++++++++++++++--
r/src/util.c | 7 ++-
r/src/util.h | 1 +
r/tests/testthat/test-ipc.R | 95 +++++++++++++++++++++++++++-
9 files changed, 331 insertions(+), 21 deletions(-)
diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index d20eec34..addb1381 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -19,7 +19,7 @@ Description: Provides an 'R' interface to the 'nanoarrow' 'C'
library and the
License: Apache License (>= 2)
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
-RoxygenNote: 7.2.3
+RoxygenNote: 7.3.2
URL: https://arrow.apache.org/nanoarrow/latest/r/,
https://github.com/apache/arrow-nanoarrow
BugReports: https://github.com/apache/arrow-nanoarrow/issues
Suggests:
diff --git a/r/NAMESPACE b/r/NAMESPACE
index aa6e71a0..d9fa3303 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -127,6 +127,8 @@ S3method(str,nanoarrow_array_stream)
S3method(str,nanoarrow_buffer)
S3method(str,nanoarrow_schema)
S3method(str,nanoarrow_vctr)
+S3method(write_nanoarrow,character)
+S3method(write_nanoarrow,connection)
export(array_stream_set_finalizer)
export(as_nanoarrow_array)
export(as_nanoarrow_array_extension)
@@ -210,6 +212,7 @@ export(read_nanoarrow)
export(register_nanoarrow_extension)
export(resolve_nanoarrow_extension)
export(unregister_nanoarrow_extension)
+export(write_nanoarrow)
importFrom(utils,getFromNamespace)
importFrom(utils,str)
useDynLib(nanoarrow, .registration = TRUE)
diff --git a/r/R/ipc.R b/r/R/ipc.R
index 29471b0f..14f49f92 100644
--- a/r/R/ipc.R
+++ b/r/R/ipc.R
@@ -15,17 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-#' Read serialized streams of Arrow data
+#' Read/write serialized streams of Arrow data
#'
-#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow
-#' data. Arrow documentation typically refers to this format as "Arrow IPC",
-#' since its origin was as a means to transmit tables between processes
+#' Reads/writes connections, file paths, URLs, or raw vectors from/to
serialized
+#' Arrow data. Arrow documentation typically refers to this format as "Arrow
+#' IPC", since its origin was as a means to transmit tables between processes
#' (e.g., multiple R sessions). This format can also be written to and read
#' from files or URLs and is essentially a high performance equivalent of
#' a CSV file that does a better job maintaining types.
#'
-#' The nanoarrow package does not currently have the ability to write
serialized
-#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use
+#' The nanoarrow package implements an IPC writer; however, you can also
+#' use [arrow::write_ipc_stream()] to write data from R, or use
#' the equivalent writer from another Arrow implementation in Python, C++,
#' Rust, JavaScript, Julia, C#, and beyond.
#'
@@ -35,6 +35,8 @@
#' @param x A `raw()` vector, connection, or file path from which to read
#' binary data. Common extensions indicating compression (.gz, .bz2, .zip)
#' are automatically uncompressed.
+#' @param data An object to write as an Arrow IPC stream, converted using
+#' [as_nanoarrow_array_stream()]. Notably, this includes a [data.frame()].
#' @param lazy By default, `read_nanoarrow()` will read and discard a copy of
#' the reader's schema to ensure that invalid streams are discovered as
#' soon as possible. Use `lazy = TRUE` to defer this check until the reader
@@ -107,6 +109,42 @@ read_nanoarrow.connection <- function(x, ..., lazy =
FALSE) {
check_stream_if_requested(reader, lazy)
}
+#' @rdname read_nanoarrow
+#' @export
+write_nanoarrow <- function(data, x, ...) {
+ UseMethod("write_nanoarrow", x)
+}
+
+#' @export
+write_nanoarrow.connection <- function(data, x, ...) {
+ if (!isOpen(x)) {
+ open(x, "wb")
+ on.exit(close(x))
+ }
+
+ writer <- .Call(nanoarrow_c_ipc_writer_connection, x)
+ stream <- as_nanoarrow_array_stream(data)
+ on.exit(nanoarrow_pointer_release(stream), add = TRUE)
+
+ .Call(nanoarrow_c_ipc_writer_write_stream, writer, stream)
+ invisible(data)
+}
+
+#' @export
+write_nanoarrow.character <- function(data, x, ...) {
+ if (length(x) != 1) {
+ stop(sprintf("Can't interpret character(%d) as file path", length(x)))
+ }
+
+ con_type <- guess_connection_type(x)
+ if (con_type == "unz") {
+ stop("zip compression not supported for write_nanoarrow()")
+ }
+
+ con <- do.call(con_type, list(x))
+ write_nanoarrow(data, con)
+}
+
#' @rdname read_nanoarrow
#' @export
example_ipc_stream <- function() {
@@ -205,3 +243,25 @@ guess_zip_filename <- function(x) {
files
}
+
+# The C-level R_tryCatch() does not provide for handling interrupts (or
+# I couldn't figure out how to make it work), so instead we provide wrappers
+# around readBin() and writeBin() that convert interrupt conditions to errors
+# (which the C code does know how to handle).
+read_bin_wrapper <- function(con, what, n) {
+ withCallingHandlers(
+ readBin(con, what, n),
+ interrupt = function(e) {
+ stop("user interrupt")
+ }
+ )
+}
+
+write_bin_wrapper <- function(object, con) {
+ withCallingHandlers(
+ writeBin(object, con),
+ interrupt = function(e) {
+ stop("user interrupt")
+ }
+ )
+}
diff --git a/r/man/read_nanoarrow.Rd b/r/man/read_nanoarrow.Rd
index 87d300c8..cf4d423e 100644
--- a/r/man/read_nanoarrow.Rd
+++ b/r/man/read_nanoarrow.Rd
@@ -2,11 +2,14 @@
% Please edit documentation in R/ipc.R
\name{read_nanoarrow}
\alias{read_nanoarrow}
+\alias{write_nanoarrow}
\alias{example_ipc_stream}
-\title{Read serialized streams of Arrow data}
+\title{Read/write serialized streams of Arrow data}
\usage{
read_nanoarrow(x, ..., lazy = FALSE)
+write_nanoarrow(data, x, ...)
+
example_ipc_stream()
}
\arguments{
@@ -20,21 +23,24 @@ are automatically uncompressed.}
the reader's schema to ensure that invalid streams are discovered as
soon as possible. Use \code{lazy = TRUE} to defer this check until the reader
is actually consumed.}
+
+\item{data}{An object to write as an Arrow IPC stream, converted using
+\code{\link[=as_nanoarrow_array_stream]{as_nanoarrow_array_stream()}}.
Notably, this includes a \code{\link[=data.frame]{data.frame()}}.}
}
\value{
A \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
}
\description{
-Reads connections, file paths, URLs, or raw vectors of serialized Arrow
-data. Arrow documentation typically refers to this format as "Arrow IPC",
-since its origin was as a means to transmit tables between processes
+Reads/writes connections, file paths, URLs, or raw vectors from/to serialized
+Arrow data. Arrow documentation typically refers to this format as "Arrow
+IPC", since its origin was as a means to transmit tables between processes
(e.g., multiple R sessions). This format can also be written to and read
from files or URLs and is essentially a high performance equivalent of
a CSV file that does a better job maintaining types.
}
\details{
-The nanoarrow package does not currently have the ability to write serialized
-IPC data: use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}}
to write data from R, or use
+The nanoarrow package implements an IPC writer; however, you can also
+use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} to write
data from R, or use
the equivalent writer from another Arrow implementation in Python, C++,
Rust, JavaScript, Julia, C#, and beyond.
diff --git a/r/src/init.c b/r/src/init.c
index 69c94391..58fe9f83 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -58,6 +58,8 @@ extern SEXP nanoarrow_c_infer_ptype(SEXP schema_xptr);
extern SEXP nanoarrow_c_convert_array(SEXP array_xptr, SEXP ptype_sexp);
extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr);
extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con);
+extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con);
+extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP
array_stream_xptr);
extern SEXP nanoarrow_c_allocate_schema(void);
extern SEXP nanoarrow_c_allocate_array(void);
extern SEXP nanoarrow_c_allocate_array_stream(void);
@@ -136,6 +138,9 @@ static const R_CallMethodDef CallEntries[] = {
1},
{"nanoarrow_c_ipc_array_reader_connection",
(DL_FUNC)&nanoarrow_c_ipc_array_reader_connection, 1},
+ {"nanoarrow_c_ipc_writer_connection",
(DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1},
+ {"nanoarrow_c_ipc_writer_write_stream",
(DL_FUNC)&nanoarrow_c_ipc_writer_write_stream,
+ 2},
{"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0},
{"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0},
{"nanoarrow_c_allocate_array_stream",
(DL_FUNC)&nanoarrow_c_allocate_array_stream, 0},
diff --git a/r/src/ipc.c b/r/src/ipc.c
index 3039c7e6..396a25fc 100644
--- a/r/src/ipc.c
+++ b/r/src/ipc.c
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <stdint.h>
#define R_NO_REMAP
#include <R.h>
#include <Rinternals.h>
@@ -48,6 +49,50 @@ static SEXP input_stream_owning_xptr(void) {
return input_stream_xptr;
}
+static void finalize_output_stream_xptr(SEXP output_stream_xptr) {
+ struct ArrowIpcOutputStream* output_stream =
+ (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);
+ if (output_stream != NULL && output_stream->release != NULL) {
+ output_stream->release(output_stream);
+ }
+
+ if (output_stream != NULL) {
+ ArrowFree(output_stream);
+ }
+}
+
+static SEXP output_stream_owning_xptr(void) {
+ struct ArrowIpcOutputStream* output_stream =
+ (struct ArrowIpcOutputStream*)ArrowMalloc(sizeof(struct
ArrowIpcOutputStream));
+ output_stream->release = NULL;
+ SEXP output_stream_xptr =
+ PROTECT(R_MakeExternalPtr(output_stream, R_NilValue, R_NilValue));
+ R_RegisterCFinalizer(output_stream_xptr, &finalize_output_stream_xptr);
+ UNPROTECT(1);
+ return output_stream_xptr;
+}
+
+static void finalize_writer_xptr(SEXP writer_xptr) {
+ struct ArrowIpcWriter* writer = (struct
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+ if (writer != NULL && writer->private_data != NULL) {
+ ArrowIpcWriterReset(writer);
+ }
+
+ if (writer != NULL) {
+ ArrowFree(writer);
+ }
+}
+
+static SEXP writer_owning_xptr(void) {
+ struct ArrowIpcWriter* writer =
+ (struct ArrowIpcWriter*)ArrowMalloc(sizeof(struct ArrowIpcWriter));
+ writer->private_data = NULL;
+ SEXP writer_xptr = PROTECT(R_MakeExternalPtr(writer, R_NilValue,
R_NilValue));
+ R_RegisterCFinalizer(writer_xptr, &finalize_writer_xptr);
+ UNPROTECT(1);
+ return writer_xptr;
+}
+
SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) {
struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr);
@@ -82,7 +127,7 @@ struct ConnectionInputStreamHandler {
int return_code;
};
-static SEXP handle_readbin_error(SEXP cond, void* hdata) {
+static SEXP handle_readbin_writebin_error(SEXP cond, void* hdata) {
struct ConnectionInputStreamHandler* data = (struct
ConnectionInputStreamHandler*)hdata;
SEXP fun = PROTECT(Rf_install("conditionMessage"));
@@ -103,7 +148,7 @@ static SEXP call_readbin(void* hdata) {
SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes));
SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con,
nanoarrow_ptype_raw, n));
- SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
+ SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg));
R_xlen_t bytes_read = Rf_xlength(result);
memcpy(data->buf, RAW(result), bytes_read);
*(data->size_read_out) = bytes_read;
@@ -112,6 +157,36 @@ static SEXP call_readbin(void* hdata) {
return R_NilValue;
}
+static SEXP call_writebin(void* hdata) {
+ struct ConnectionInputStreamHandler* data = (struct
ConnectionInputStreamHandler*)hdata;
+
+ // Write 16MB chunks. This a balance between being small enough not to
+ // copy too much of the source unnecessarily and big enough to avoid
+ // unnecessary R evaluation overhead.
+ int64_t chunk_buffer_size = 16777216;
+ SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size));
+ SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer,
data->con));
+ while (data->buf_size_bytes > chunk_buffer_size) {
+ memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size);
+ Rf_eval(call, nanoarrow_ns_pkg);
+ data->buf_size_bytes -= chunk_buffer_size;
+ data->buf += chunk_buffer_size;
+ }
+
+ UNPROTECT(2);
+
+ // Write remaining bytes
+ if (data->buf_size_bytes > 0) {
+ chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes));
+ call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
+ memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes);
+ Rf_eval(call, nanoarrow_ns_pkg);
+ UNPROTECT(2);
+ }
+
+ return R_NilValue;
+}
+
static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream,
uint8_t* buf, int64_t
buf_size_bytes,
int64_t* size_read_out,
@@ -129,7 +204,32 @@ static ArrowErrorCode read_con_input_stream(struct
ArrowIpcInputStream* stream,
data.error = error;
data.return_code = NANOARROW_OK;
- R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data);
+ R_tryCatchError(&call_readbin, &data, &handle_readbin_writebin_error, &data);
+ return data.return_code;
+}
+
+static ArrowErrorCode write_con_output_stream(struct ArrowIpcOutputStream*
stream,
+ const void* buf, int64_t
buf_size_bytes,
+ int64_t* size_write_out,
+ struct ArrowError* error) {
+ if (!nanoarrow_is_main_thread()) {
+ ArrowErrorSet(error, "Can't read from R connection on a non-R thread");
+ return EIO;
+ }
+
+ struct ConnectionInputStreamHandler data;
+ data.con = (SEXP)stream->private_data;
+ data.buf = (void*)buf;
+ data.buf_size_bytes = buf_size_bytes;
+ data.size_read_out = NULL;
+ data.error = error;
+ data.return_code = NANOARROW_OK;
+
+ R_tryCatchError(&call_writebin, &data, &handle_readbin_writebin_error,
&data);
+
+ // This implementation always blocks until all bytes have been written
+ *size_write_out = buf_size_bytes;
+
return data.return_code;
}
@@ -137,6 +237,10 @@ static void release_con_input_stream(struct
ArrowIpcInputStream* stream) {
nanoarrow_release_sexp((SEXP)stream->private_data);
}
+static void release_con_output_stream(struct ArrowIpcOutputStream* stream) {
+ nanoarrow_release_sexp((SEXP)stream->private_data);
+}
+
SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* array_stream =
@@ -153,9 +257,46 @@ SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL);
if (code != NANOARROW_OK) {
- Rf_error("ArrowIpcArrayStreamReaderInit() failed");
+ Rf_error("ArrowIpcArrayStreamReaderInit() failed with errno %d", code);
}
UNPROTECT(2);
return array_stream_xptr;
}
+
+SEXP nanoarrow_c_ipc_writer_connection(SEXP con) {
+ SEXP output_stream_xptr = PROTECT(output_stream_owning_xptr());
+ struct ArrowIpcOutputStream* output_stream =
+ (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);
+
+ output_stream->write = &write_con_output_stream;
+ output_stream->release = &release_con_output_stream;
+ output_stream->private_data = (SEXP)con;
+ nanoarrow_preserve_sexp(con);
+
+ SEXP writer_xptr = PROTECT(writer_owning_xptr());
+ struct ArrowIpcWriter* writer = (struct
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+
+ int code = ArrowIpcWriterInit(writer, output_stream);
+ if (code != NANOARROW_OK) {
+ Rf_error("ArrowIpcWriterInit() failed with errno %d", code);
+ }
+
+ UNPROTECT(2);
+ return writer_xptr;
+}
+
+SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP
array_stream_xptr) {
+ struct ArrowIpcWriter* writer = (struct
ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
+ struct ArrowArrayStream* array_stream =
+ nanoarrow_array_stream_from_xptr(array_stream_xptr);
+
+ struct ArrowError error;
+ ArrowErrorInit(&error);
+ int code = ArrowIpcWriterWriteArrayStream(writer, array_stream, &error);
+ if (code != NANOARROW_OK) {
+ Rf_error("ArrowIpcWriterWriteArrayStream() failed: %s", error.message);
+ }
+
+ return R_NilValue;
+}
diff --git a/r/src/util.c b/r/src/util.c
index 6d4035ba..56d9d05a 100644
--- a/r/src/util.c
+++ b/r/src/util.c
@@ -30,6 +30,7 @@ SEXP nanoarrow_cls_schema = NULL;
SEXP nanoarrow_cls_array_stream = NULL;
SEXP nanoarrow_cls_buffer = NULL;
SEXP nanoarrow_sym_readbin = NULL;
+SEXP nanoarrow_sym_writebin = NULL;
SEXP nanoarrow_ptype_raw = NULL;
void nanoarrow_init_cached_sexps(void) {
@@ -42,7 +43,8 @@ void nanoarrow_init_cached_sexps(void) {
nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema"));
nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream"));
nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer"));
- nanoarrow_sym_readbin = PROTECT(Rf_install("readBin"));
+ nanoarrow_sym_readbin = PROTECT(Rf_install("read_bin_wrapper"));
+ nanoarrow_sym_writebin = PROTECT(Rf_install("write_bin_wrapper"));
nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0));
R_PreserveObject(nanoarrow_ns_pkg);
@@ -54,9 +56,10 @@ void nanoarrow_init_cached_sexps(void) {
R_PreserveObject(nanoarrow_cls_array_stream);
R_PreserveObject(nanoarrow_cls_buffer);
R_PreserveObject(nanoarrow_sym_readbin);
+ R_PreserveObject(nanoarrow_sym_writebin);
R_PreserveObject(nanoarrow_ptype_raw);
- UNPROTECT(11);
+ UNPROTECT(12);
}
SEXP nanoarrow_c_preserved_count(void) {
diff --git a/r/src/util.h b/r/src/util.h
index d652330e..23a9f2ed 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -32,6 +32,7 @@ extern SEXP nanoarrow_cls_schema;
extern SEXP nanoarrow_cls_array_stream;
extern SEXP nanoarrow_cls_buffer;
extern SEXP nanoarrow_sym_readbin;
+extern SEXP nanoarrow_sym_writebin;
extern SEXP nanoarrow_ptype_raw;
void nanoarrow_init_cached_sexps(void);
diff --git a/r/tests/testthat/test-ipc.R b/r/tests/testthat/test-ipc.R
index 85adf0f0..3944de4b 100644
--- a/r/tests/testthat/test-ipc.R
+++ b/r/tests/testthat/test-ipc.R
@@ -36,6 +36,20 @@ test_that("read_nanoarrow() works for open connections", {
)
})
+test_that("write_nanoarrow() works for open connections", {
+ tf <- tempfile()
+ on.exit(unlink(tf))
+
+ con <- rawConnection(raw(), "wb")
+ on.exit(close(con))
+
+ write_nanoarrow(data.frame(), con)
+ expect_identical(
+ as.data.frame(read_nanoarrow(rawConnectionValue(con))),
+ data.frame()
+ )
+})
+
test_that("read_nanoarrow() works for unopened connections", {
tf <- tempfile()
on.exit(unlink(tf))
@@ -56,6 +70,20 @@ test_that("read_nanoarrow() works for unopened connections",
{
)
})
+test_that("write_nanoarrow() works for unopened connections", {
+ tf <- tempfile()
+ on.exit(unlink(tf))
+
+ con <- file(tf)
+ # Don't close on exit, because we're supposed to do that
+
+ write_nanoarrow(data.frame(), con)
+ expect_error(
+ close(con),
+ "invalid connection"
+ )
+})
+
test_that("read_nanoarrow() works for file paths", {
tf <- tempfile()
on.exit(unlink(tf))
@@ -71,6 +99,15 @@ test_that("read_nanoarrow() works for file paths", {
)
})
+test_that("write_nanoarrow() works for file paths", {
+ tf <- tempfile()
+ on.exit(unlink(tf))
+
+ df <- data.frame(letters = letters)
+ expect_identical(write_nanoarrow(df, tf), df)
+ expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
test_that("read_nanoarrow() works for URLs", {
tf <- tempfile()
on.exit(unlink(tf))
@@ -86,6 +123,15 @@ test_that("read_nanoarrow() works for URLs", {
)
})
+test_that("write_nanoarrow() works for URLs", {
+ tf <- tempfile()
+ on.exit(unlink(tf))
+
+ df <- data.frame(letters = letters)
+ expect_identical(write_nanoarrow(df, paste0("file://", tf)), df)
+ expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
test_that("read_nanoarrow() works for compressed .gz file paths", {
tf <- tempfile(fileext = ".gz")
on.exit(unlink(tf))
@@ -101,6 +147,15 @@ test_that("read_nanoarrow() works for compressed .gz file
paths", {
)
})
+test_that("write_nanoarrow() works for compressed .gz file paths", {
+ tf <- tempfile(fileext = ".gz")
+ on.exit(unlink(tf))
+
+ df <- data.frame(letters = letters)
+ expect_identical(write_nanoarrow(df, tf), df)
+ expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
test_that("read_nanoarrow() works for compressed .bz2 file paths", {
tf <- tempfile(fileext = ".bz2")
on.exit(unlink(tf))
@@ -116,6 +171,15 @@ test_that("read_nanoarrow() works for compressed .bz2 file
paths", {
)
})
+test_that("write_nanoarrow() works for compressed .bz2 file paths", {
+ tf <- tempfile(fileext = ".bz2")
+ on.exit(unlink(tf))
+
+ df <- data.frame(letters = letters)
+ expect_identical(write_nanoarrow(df, tf), df)
+ expect_identical(as.data.frame(read_nanoarrow(tf)), df)
+})
+
test_that("read_nanoarrow() works for compressed .zip file paths", {
tf <- tempfile(fileext = ".zip")
tdir <- tempfile()
@@ -144,6 +208,17 @@ test_that("read_nanoarrow() works for compressed .zip file
paths", {
)
})
+test_that("write_nanoarrow() errors for compressed .zip file paths", {
+ tf <- tempfile(fileext = ".zip")
+ on.exit(unlink(tf))
+
+ df <- data.frame(letters = letters)
+ expect_error(
+ write_nanoarrow(df, tf),
+ "zip compression not supported"
+ )
+})
+
test_that("read_nanoarrow() errors for compressed URL paths", {
expect_error(
read_nanoarrow("https://something.zip"),
@@ -151,11 +226,15 @@ test_that("read_nanoarrow() errors for compressed URL
paths", {
)
})
-test_that("read_nanoarrow() errors for input with length != 1", {
+test_that("read|write_nanoarrow() errors for input with length != 1", {
expect_error(
read_nanoarrow(character(0)),
"Can't interpret character"
)
+ expect_error(
+ write_nanoarrow(data.frame(), character(0)),
+ "Can't interpret character"
+ )
})
test_that("read_nanoarrow() errors zip archives that contain files != 1", {
@@ -187,7 +266,7 @@ test_that("read_nanoarrow() reports errors from readBin", {
writeLines("this is not a binary file", tf)
con <- file(tf, open = "r")
- on.exit(close(con))
+ on.exit(close(con), add = TRUE)
expect_error(
read_nanoarrow(con),
@@ -195,6 +274,18 @@ test_that("read_nanoarrow() reports errors from readBin", {
)
})
+test_that("write_nanoarrow() reports errors from writeBin", {
+ tf <- tempfile()
+ on.exit(unlink(tf))
+ con <- file(tf, open = "w")
+ on.exit(close(con), add = TRUE)
+
+ expect_error(
+ write_nanoarrow(data.frame(), con),
+ "R execution error"
+ )
+})
+
test_that("read_nanoarrow() respects lazy argument", {
expect_error(
read_nanoarrow(raw(0), lazy = FALSE),