This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new e0d73c5fd5 GH-38828: [R] Ensure that streams can be written to socket
connections (#38897)
e0d73c5fd5 is described below
commit e0d73c5fd521e9cf79f8c0bbe74285ab8fd4b6d2
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Apr 3 11:14:27 2024 -0300
GH-38828: [R] Ensure that streams can be written to socket connections
(#38897)
### Rationale for this change
Currently we can't write to socket connection from R. This is a very useful
way to send Arrow data around and should work!
### What changes are included in this PR?
Implements `Tell()` for non-seekable output streams. Apparently some Arrow
code calls this to figure out how many bytes have been written.
### Are these changes tested?
I'm not quite sure how to test this...all output streams we can easily test
are seekable. We could try to spin up a socket server on another thread (like
the reprex below) but I'm worried that will be flaky.
### Are there any user-facing changes?
Yes (something that should have previously worked now works), although
there is no place where we currently document anything about how connections
can be used.
``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
server <- function() {
library(arrow)
while (TRUE) {
writeLines("Listening...")
con <- socketConnection(host = "localhost", port = 6011, blocking =
TRUE,
server = TRUE, open = "r+b")
socketTimeout(con, 3600)
data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
print(head(as.data.frame(data)))
}
}
server()
}, stdout = tmp)
Sys.sleep(0.5)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()`
for more information.
rb <- arrow::record_batch(iris)
socketDriver <- socketConnection(host = "localhost",
port = "6011",
blocking = TRUE,
server = FALSE,
open = "w+b")
write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#> Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1 5.1 3.5 1.4 0.2 setosa
#> 2 4.9 3.0 1.4 0.2 setosa
#> 3 4.7 3.2 1.3 0.2 setosa
#> 4 4.6 3.1 1.5 0.2 setosa
#> 5 5.0 3.6 1.4 0.2 setosa
#> 6 5.4 3.9 1.7 0.4 setosa
#> Listening...
# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```
<sup>Created on 2023-11-27 with [reprex
v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: #38828
* GitHub Issue: #38828
Authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
---
r/R/csv.R | 7 +++---
r/R/feather.R | 2 +-
r/R/ipc-stream.R | 4 ++--
r/R/parquet.R | 2 +-
r/man/read_delim_arrow.Rd | 5 +++--
r/man/read_feather.Rd | 4 ++--
r/man/read_ipc_stream.Rd | 4 ++--
r/man/read_json_arrow.Rd | 5 +++--
r/man/read_parquet.Rd | 4 ++--
r/man/write_csv_arrow.Rd | 2 +-
r/man/write_feather.Rd | 2 +-
r/man/write_ipc_stream.Rd | 2 +-
r/man/write_parquet.Rd | 2 +-
r/src/io.cpp | 57 +++++++++++++++++++++++++++++++++++++----------
14 files changed, 69 insertions(+), 33 deletions(-)
diff --git a/r/R/csv.R b/r/R/csv.R
index 03540006ca..7335475703 100644
--- a/r/R/csv.R
+++ b/r/R/csv.R
@@ -78,8 +78,9 @@
#' `col_names`, and the CSV file has a header row that would otherwise be used
#' to identify column names, you'll need to add `skip = 1` to skip that row.
#'
-#' @param file A character file name or URI, literal data (either a single
string or a [raw] vector),
-#' an Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
+#' @param file A character file name or URI, connection, literal data (either a
+#' single string or a [raw] vector), an Arrow input stream, or a `FileSystem`
+#' with path (`SubTreeFileSystem`).
#'
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' closed when finished; compression will be detected from the file extension
@@ -894,7 +895,7 @@ readr_to_csv_convert_options <- function(na,
#' Write CSV file to disk
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path
in a file
#' system (`SubTreeFileSystem`)
#' @param file file name. Specify this or `sink`, not both.
#' @param include_header Whether to write an initial header line with column
names
diff --git a/r/R/feather.R b/r/R/feather.R
index 474fc6118e..aa08dfdbc9 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -29,7 +29,7 @@
#' [write_ipc_file()] can only write V2 files.
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path
in a file
#' system (`SubTreeFileSystem`)
#' @param version integer Feather file version, Version 1 or Version 2.
Version 2 is the default.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
diff --git a/r/R/ipc-stream.R b/r/R/ipc-stream.R
index 37ef0bbaf2..26a61a790f 100644
--- a/r/R/ipc-stream.R
+++ b/r/R/ipc-stream.R
@@ -82,8 +82,8 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' a "stream" format and a "file" format, known as Feather. `read_ipc_stream()`
#' and [read_feather()] read those formats, respectively.
#'
-#' @param file A character file name or URI, `raw` vector, an Arrow input
stream,
-#' or a `FileSystem` with path (`SubTreeFileSystem`).
+#' @param file A character file name or URI, connection, `raw` vector, an
+#' Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
#' If a file name or URI, an Arrow [InputStream] will be opened and
#' closed when finished. If an input stream is provided, it will be left
#' open.
diff --git a/r/R/parquet.R b/r/R/parquet.R
index d92e913cb5..0ee6c62601 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -90,7 +90,7 @@ read_parquet <- function(file,
#' article} for examples of this.
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path
in a file
#' system (`SubTreeFileSystem`)
#' @param chunk_size how many rows of data to write to disk at once. This
#' directly corresponds to how many rows will be in each row group in
diff --git a/r/man/read_delim_arrow.Rd b/r/man/read_delim_arrow.Rd
index b56d445c9e..f946785e4a 100644
--- a/r/man/read_delim_arrow.Rd
+++ b/r/man/read_delim_arrow.Rd
@@ -90,8 +90,9 @@ read_tsv_arrow(
)
}
\arguments{
-\item{file}{A character file name or URI, literal data (either a single string
or a \link{raw} vector),
-an Arrow input stream, or a \code{FileSystem} with path
(\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, literal data (either a
+single string or a \link{raw} vector), an Arrow input stream, or a
\code{FileSystem}
+with path (\code{SubTreeFileSystem}).
If a file name, a memory-mapped Arrow \link{InputStream} will be opened and
closed when finished; compression will be detected from the file extension
diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd
index c3b4a54158..95661d9778 100644
--- a/r/man/read_feather.Rd
+++ b/r/man/read_feather.Rd
@@ -10,8 +10,8 @@ read_feather(file, col_select = NULL, as_data_frame = TRUE,
mmap = TRUE)
read_ipc_file(file, col_select = NULL, as_data_frame = TRUE, mmap = TRUE)
}
\arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path
(\code{SubTreeFileSystem}).
If a file name or URI, an Arrow \link{InputStream} will be opened and
closed when finished. If an input stream is provided, it will be left
open.}
diff --git a/r/man/read_ipc_stream.Rd b/r/man/read_ipc_stream.Rd
index db930b52bd..49d3949bfc 100644
--- a/r/man/read_ipc_stream.Rd
+++ b/r/man/read_ipc_stream.Rd
@@ -7,8 +7,8 @@
read_ipc_stream(file, as_data_frame = TRUE, ...)
}
\arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path
(\code{SubTreeFileSystem}).
If a file name or URI, an Arrow \link{InputStream} will be opened and
closed when finished. If an input stream is provided, it will be left
open.}
diff --git a/r/man/read_json_arrow.Rd b/r/man/read_json_arrow.Rd
index 9230a9a017..f289d3356e 100644
--- a/r/man/read_json_arrow.Rd
+++ b/r/man/read_json_arrow.Rd
@@ -13,8 +13,9 @@ read_json_arrow(
)
}
\arguments{
-\item{file}{A character file name or URI, literal data (either a single string
or a \link{raw} vector),
-an Arrow input stream, or a \code{FileSystem} with path
(\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, literal data (either a
+single string or a \link{raw} vector), an Arrow input stream, or a
\code{FileSystem}
+with path (\code{SubTreeFileSystem}).
If a file name, a memory-mapped Arrow \link{InputStream} will be opened and
closed when finished; compression will be detected from the file extension
diff --git a/r/man/read_parquet.Rd b/r/man/read_parquet.Rd
index 4f19365295..95ee4ac5a8 100644
--- a/r/man/read_parquet.Rd
+++ b/r/man/read_parquet.Rd
@@ -14,8 +14,8 @@ read_parquet(
)
}
\arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path
(\code{SubTreeFileSystem}).
If a file name or URI, an Arrow \link{InputStream} will be opened and
closed when finished. If an input stream is provided, it will be left
open.}
diff --git a/r/man/write_csv_arrow.Rd b/r/man/write_csv_arrow.Rd
index 9fcca49fad..9f9fde74cc 100644
--- a/r/man/write_csv_arrow.Rd
+++ b/r/man/write_csv_arrow.Rd
@@ -19,7 +19,7 @@ write_csv_arrow(
\arguments{
\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or
path in a file
system (\code{SubTreeFileSystem})}
\item{file}{file name. Specify this or \code{sink}, not both.}
diff --git a/r/man/write_feather.Rd b/r/man/write_feather.Rd
index 0d3a7da3b9..823bd2224e 100644
--- a/r/man/write_feather.Rd
+++ b/r/man/write_feather.Rd
@@ -25,7 +25,7 @@ write_ipc_file(
\arguments{
\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or
path in a file
system (\code{SubTreeFileSystem})}
\item{version}{integer Feather file version, Version 1 or Version 2. Version 2
is the default.}
diff --git a/r/man/write_ipc_stream.Rd b/r/man/write_ipc_stream.Rd
index 094e3ad11a..da9bb6bcac 100644
--- a/r/man/write_ipc_stream.Rd
+++ b/r/man/write_ipc_stream.Rd
@@ -9,7 +9,7 @@ write_ipc_stream(x, sink, ...)
\arguments{
\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or
path in a file
system (\code{SubTreeFileSystem})}
\item{...}{extra parameters passed to \code{write_feather()}.}
diff --git a/r/man/write_parquet.Rd b/r/man/write_parquet.Rd
index 480abb12fc..954c692dad 100644
--- a/r/man/write_parquet.Rd
+++ b/r/man/write_parquet.Rd
@@ -22,7 +22,7 @@ write_parquet(
\arguments{
\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or
path in a file
system (\code{SubTreeFileSystem})}
\item{chunk_size}{how many rows of data to write to disk at once. This
diff --git a/r/src/io.cpp b/r/src/io.cpp
index 4d5ee31794..2f36d51dcd 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -212,11 +212,16 @@ void io___BufferOutputStream__Write(
class RConnectionFileInterface : public virtual arrow::io::FileInterface {
public:
explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
- : connection_sexp_(connection_sexp), closed_(false) {
+ : connection_sexp_(connection_sexp),
+ closed_(false),
+ seekable_(false),
+ bytes_written_(0),
+ bytes_read_(0) {
check_closed();
+ seekable_ = check_seekable();
}
- arrow::Status Close() {
+ arrow::Status Close() override {
if (closed_) {
return arrow::Status::OK();
}
@@ -227,11 +232,21 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
"close() on R connection");
}
- arrow::Result<int64_t> Tell() const {
+ arrow::Result<int64_t> Tell() const override {
if (closed()) {
return arrow::Status::IOError("R connection is closed");
}
+ // R connections use seek() with no additional arguments as a tell()
+ // implementation; however, non-seekable connections will error if you
+ // do this. This heuristic allows Tell() to return a reasonable value
+ // (used by at least the IPC writer).
+ if (!seekable_ && bytes_written_ > 0) {
+ return bytes_written_;
+ } else if (!seekable_) {
+ return bytes_read_;
+ }
+
return SafeCallIntoR<int64_t>(
[&]() {
cpp11::sexp result =
cpp11::package("base")["seek"](connection_sexp_);
@@ -240,7 +255,7 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
"tell() on R connection");
}
- bool closed() const { return closed_; }
+ bool closed() const override { return closed_; }
protected:
cpp11::sexp connection_sexp_;
@@ -261,13 +276,14 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
return SafeCallIntoR<int64_t>(
[&] {
cpp11::function read_bin = cpp11::package("base")["readBin"];
- cpp11::writable::raws ptype((R_xlen_t)0);
+ cpp11::writable::raws ptype(static_cast<R_xlen_t>(0));
cpp11::integers n = cpp11::as_sexp<int>(static_cast<int>(nbytes));
cpp11::sexp result = read_bin(connection_sexp_, ptype, n);
int64_t result_size = cpp11::safe[Rf_xlength](result);
memcpy(out, cpp11::safe[RAW](result), result_size);
+ bytes_read_++;
return result_size;
},
"readBin() on R connection");
@@ -294,6 +310,7 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
cpp11::function write_bin = cpp11::package("base")["writeBin"];
write_bin(data_raw, connection_sexp_);
+ bytes_written_ += nbytes;
},
"writeBin() on R connection");
}
@@ -312,6 +329,9 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
private:
bool closed_;
+ bool seekable_;
+ int64_t bytes_written_;
+ int64_t bytes_read_;
bool check_closed() {
if (closed_) {
@@ -333,6 +353,15 @@ class RConnectionFileInterface : public virtual
arrow::io::FileInterface {
return closed_;
}
+
+ bool check_seekable() {
+ auto is_seekable_result = SafeCallIntoR<bool>([&] {
+ cpp11::sexp result =
cpp11::package("base")["isSeekable"](connection_sexp_);
+ return cpp11::as_cpp<bool>(result);
+ });
+
+ return is_seekable_result.ok() && *is_seekable_result;
+ }
};
class RConnectionInputStream : public virtual arrow::io::InputStream,
@@ -341,9 +370,11 @@ class RConnectionInputStream : public virtual
arrow::io::InputStream,
explicit RConnectionInputStream(cpp11::sexp connection_sexp)
: RConnectionFileInterface(connection_sexp) {}
- arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return
ReadBase(nbytes, out); }
+ arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ return ReadBase(nbytes, out);
+ }
- arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
return ReadBase(nbytes);
}
};
@@ -373,13 +404,15 @@ class RConnectionRandomAccessFile : public
arrow::io::RandomAccessFile,
}
}
- arrow::Result<int64_t> GetSize() { return size_; }
+ arrow::Result<int64_t> GetSize() override { return size_; }
- arrow::Status Seek(int64_t pos) { return SeekBase(pos); }
+ arrow::Status Seek(int64_t pos) override { return SeekBase(pos); }
- arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return
ReadBase(nbytes, out); }
+ arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ return ReadBase(nbytes, out);
+ }
- arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
return ReadBase(nbytes);
}
@@ -393,7 +426,7 @@ class RConnectionOutputStream : public
arrow::io::OutputStream,
explicit RConnectionOutputStream(cpp11::sexp connection_sexp)
: RConnectionFileInterface(connection_sexp) {}
- arrow::Status Write(const void* data, int64_t nbytes) {
+ arrow::Status Write(const void* data, int64_t nbytes) override {
return WriteBase(data, nbytes);
}
};