This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0d73c5fd5 GH-38828: [R] Ensure that streams can be written to socket 
connections (#38897)
e0d73c5fd5 is described below

commit e0d73c5fd521e9cf79f8c0bbe74285ab8fd4b6d2
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Apr 3 11:14:27 2024 -0300

    GH-38828: [R] Ensure that streams can be written to socket connections 
(#38897)
    
    ### Rationale for this change
    
    Currently we can't write to socket connection from R. This is a very useful 
way to send Arrow data around and should work!
    
    ### What changes are included in this PR?
    
    Implements `Tell()` for non-seekable output streams. Apparently some Arrow 
code calls this to figure out how many bytes have been written.
    
    ### Are these changes tested?
    
    I'm not quite sure how to test this...all output streams we can easily test 
are seekable. We could try to spin up a socket server on another thread (like 
the reprex below) but I'm worried that will be flaky.
    
    ### Are there any user-facing changes?
    
    Yes (something that should have previously worked now works), although 
there is no place where we currently document anything about how connections 
can be used.
    
    ``` r
    tmp <- tempfile()
    proc <- callr::r_bg(function() {
      server <- function() {
        library(arrow)
    
        while (TRUE) {
          writeLines("Listening...")
          con <- socketConnection(host = "localhost", port = 6011, blocking = 
TRUE,
                                  server = TRUE, open = "r+b")
          socketTimeout(con, 3600)
    
          data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
          print(head(as.data.frame(data)))
    
        }
      }
    
      server()
    }, stdout = tmp)
    
    Sys.sleep(0.5)
    
    library(arrow, warn.conflicts = FALSE)
    #> Some features are not enabled in this build of Arrow. Run `arrow_info()` 
for more information.
    rb <- arrow::record_batch(iris)
    
    socketDriver <- socketConnection(host = "localhost",
                                     port = "6011",
                                     blocking = TRUE,
                                     server = FALSE,
                                     open = "w+b")
    
    write_ipc_stream(rb, socketDriver)
    Sys.sleep(0.5)
    cat(brio::read_file(tmp))
    #> Listening...
    #>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
    #> 1          5.1         3.5          1.4         0.2  setosa
    #> 2          4.9         3.0          1.4         0.2  setosa
    #> 3          4.7         3.2          1.3         0.2  setosa
    #> 4          4.6         3.1          1.5         0.2  setosa
    #> 5          5.0         3.6          1.4         0.2  setosa
    #> 6          5.4         3.9          1.7         0.4  setosa
    #> Listening...
    
    # Shutdown server
    proc$interrupt()
    #> [1] TRUE
    Sys.sleep(0.5)
    proc$is_alive()
    #> [1] FALSE
    ```
    
    <sup>Created on 2023-11-27 with [reprex 
v2.0.2](https://reprex.tidyverse.org)</sup>
    * Closes: #38828
    * GitHub Issue: #38828
    
    Authored-by: Dewey Dunnington <[email protected]>
    Signed-off-by: Dewey Dunnington <[email protected]>
---
 r/R/csv.R                 |  7 +++---
 r/R/feather.R             |  2 +-
 r/R/ipc-stream.R          |  4 ++--
 r/R/parquet.R             |  2 +-
 r/man/read_delim_arrow.Rd |  5 +++--
 r/man/read_feather.Rd     |  4 ++--
 r/man/read_ipc_stream.Rd  |  4 ++--
 r/man/read_json_arrow.Rd  |  5 +++--
 r/man/read_parquet.Rd     |  4 ++--
 r/man/write_csv_arrow.Rd  |  2 +-
 r/man/write_feather.Rd    |  2 +-
 r/man/write_ipc_stream.Rd |  2 +-
 r/man/write_parquet.Rd    |  2 +-
 r/src/io.cpp              | 57 +++++++++++++++++++++++++++++++++++++----------
 14 files changed, 69 insertions(+), 33 deletions(-)

diff --git a/r/R/csv.R b/r/R/csv.R
index 03540006ca..7335475703 100644
--- a/r/R/csv.R
+++ b/r/R/csv.R
@@ -78,8 +78,9 @@
 #' `col_names`, and the CSV file has a header row that would otherwise be used
 #' to identify column names, you'll need to add `skip = 1` to skip that row.
 #'
-#' @param file A character file name or URI, literal data (either a single 
string or a [raw] vector),
-#' an Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
+#' @param file A character file name or URI, connection, literal data (either a
+#' single string or a [raw] vector), an Arrow input stream, or a `FileSystem`
+#' with path (`SubTreeFileSystem`).
 #'
 #' If a file name, a memory-mapped Arrow [InputStream] will be opened and
 #' closed when finished; compression will be detected from the file extension
@@ -894,7 +895,7 @@ readr_to_csv_convert_options <- function(na,
 #' Write CSV file to disk
 #'
 #' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path 
in a file
 #' system (`SubTreeFileSystem`)
 #' @param file file name. Specify this or `sink`, not both.
 #' @param include_header Whether to write an initial header line with column 
names
diff --git a/r/R/feather.R b/r/R/feather.R
index 474fc6118e..aa08dfdbc9 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -29,7 +29,7 @@
 #' [write_ipc_file()] can only write V2 files.
 #'
 #' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path 
in a file
 #' system (`SubTreeFileSystem`)
 #' @param version integer Feather file version, Version 1 or Version 2. 
Version 2 is the default.
 #' @param chunk_size For V2 files, the number of rows that each chunk of data
diff --git a/r/R/ipc-stream.R b/r/R/ipc-stream.R
index 37ef0bbaf2..26a61a790f 100644
--- a/r/R/ipc-stream.R
+++ b/r/R/ipc-stream.R
@@ -82,8 +82,8 @@ write_to_raw <- function(x, format = c("stream", "file")) {
 #' a "stream" format and a "file" format, known as Feather. `read_ipc_stream()`
 #' and [read_feather()] read those formats, respectively.
 #'
-#' @param file A character file name or URI, `raw` vector, an Arrow input 
stream,
-#' or a `FileSystem` with path (`SubTreeFileSystem`).
+#' @param file A character file name or URI, connection, `raw` vector, an
+#' Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
 #' If a file name or URI, an Arrow [InputStream] will be opened and
 #' closed when finished. If an input stream is provided, it will be left
 #' open.
diff --git a/r/R/parquet.R b/r/R/parquet.R
index d92e913cb5..0ee6c62601 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -90,7 +90,7 @@ read_parquet <- function(file,
 #' article} for examples of this.
 #'
 #' @param x `data.frame`, [RecordBatch], or [Table]
-#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' @param sink A string file path, connection, URI, or [OutputStream], or path 
in a file
 #' system (`SubTreeFileSystem`)
 #' @param chunk_size how many rows of data to write to disk at once. This
 #'    directly corresponds to how many rows will be in each row group in
diff --git a/r/man/read_delim_arrow.Rd b/r/man/read_delim_arrow.Rd
index b56d445c9e..f946785e4a 100644
--- a/r/man/read_delim_arrow.Rd
+++ b/r/man/read_delim_arrow.Rd
@@ -90,8 +90,9 @@ read_tsv_arrow(
 )
 }
 \arguments{
-\item{file}{A character file name or URI, literal data (either a single string 
or a \link{raw} vector),
-an Arrow input stream, or a \code{FileSystem} with path 
(\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, literal data (either a
+single string or a \link{raw} vector), an Arrow input stream, or a 
\code{FileSystem}
+with path (\code{SubTreeFileSystem}).
 
 If a file name, a memory-mapped Arrow \link{InputStream} will be opened and
 closed when finished; compression will be detected from the file extension
diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd
index c3b4a54158..95661d9778 100644
--- a/r/man/read_feather.Rd
+++ b/r/man/read_feather.Rd
@@ -10,8 +10,8 @@ read_feather(file, col_select = NULL, as_data_frame = TRUE, 
mmap = TRUE)
 read_ipc_file(file, col_select = NULL, as_data_frame = TRUE, mmap = TRUE)
 }
 \arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input 
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path 
(\code{SubTreeFileSystem}).
 If a file name or URI, an Arrow \link{InputStream} will be opened and
 closed when finished. If an input stream is provided, it will be left
 open.}
diff --git a/r/man/read_ipc_stream.Rd b/r/man/read_ipc_stream.Rd
index db930b52bd..49d3949bfc 100644
--- a/r/man/read_ipc_stream.Rd
+++ b/r/man/read_ipc_stream.Rd
@@ -7,8 +7,8 @@
 read_ipc_stream(file, as_data_frame = TRUE, ...)
 }
 \arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input 
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path 
(\code{SubTreeFileSystem}).
 If a file name or URI, an Arrow \link{InputStream} will be opened and
 closed when finished. If an input stream is provided, it will be left
 open.}
diff --git a/r/man/read_json_arrow.Rd b/r/man/read_json_arrow.Rd
index 9230a9a017..f289d3356e 100644
--- a/r/man/read_json_arrow.Rd
+++ b/r/man/read_json_arrow.Rd
@@ -13,8 +13,9 @@ read_json_arrow(
 )
 }
 \arguments{
-\item{file}{A character file name or URI, literal data (either a single string 
or a \link{raw} vector),
-an Arrow input stream, or a \code{FileSystem} with path 
(\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, literal data (either a
+single string or a \link{raw} vector), an Arrow input stream, or a 
\code{FileSystem}
+with path (\code{SubTreeFileSystem}).
 
 If a file name, a memory-mapped Arrow \link{InputStream} will be opened and
 closed when finished; compression will be detected from the file extension
diff --git a/r/man/read_parquet.Rd b/r/man/read_parquet.Rd
index 4f19365295..95ee4ac5a8 100644
--- a/r/man/read_parquet.Rd
+++ b/r/man/read_parquet.Rd
@@ -14,8 +14,8 @@ read_parquet(
 )
 }
 \arguments{
-\item{file}{A character file name or URI, \code{raw} vector, an Arrow input 
stream,
-or a \code{FileSystem} with path (\code{SubTreeFileSystem}).
+\item{file}{A character file name or URI, connection, \code{raw} vector, an
+Arrow input stream, or a \code{FileSystem} with path 
(\code{SubTreeFileSystem}).
 If a file name or URI, an Arrow \link{InputStream} will be opened and
 closed when finished. If an input stream is provided, it will be left
 open.}
diff --git a/r/man/write_csv_arrow.Rd b/r/man/write_csv_arrow.Rd
index 9fcca49fad..9f9fde74cc 100644
--- a/r/man/write_csv_arrow.Rd
+++ b/r/man/write_csv_arrow.Rd
@@ -19,7 +19,7 @@ write_csv_arrow(
 \arguments{
 \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
 
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or 
path in a file
 system (\code{SubTreeFileSystem})}
 
 \item{file}{file name. Specify this or \code{sink}, not both.}
diff --git a/r/man/write_feather.Rd b/r/man/write_feather.Rd
index 0d3a7da3b9..823bd2224e 100644
--- a/r/man/write_feather.Rd
+++ b/r/man/write_feather.Rd
@@ -25,7 +25,7 @@ write_ipc_file(
 \arguments{
 \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
 
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or 
path in a file
 system (\code{SubTreeFileSystem})}
 
 \item{version}{integer Feather file version, Version 1 or Version 2. Version 2 
is the default.}
diff --git a/r/man/write_ipc_stream.Rd b/r/man/write_ipc_stream.Rd
index 094e3ad11a..da9bb6bcac 100644
--- a/r/man/write_ipc_stream.Rd
+++ b/r/man/write_ipc_stream.Rd
@@ -9,7 +9,7 @@ write_ipc_stream(x, sink, ...)
 \arguments{
 \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
 
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or 
path in a file
 system (\code{SubTreeFileSystem})}
 
 \item{...}{extra parameters passed to \code{write_feather()}.}
diff --git a/r/man/write_parquet.Rd b/r/man/write_parquet.Rd
index 480abb12fc..954c692dad 100644
--- a/r/man/write_parquet.Rd
+++ b/r/man/write_parquet.Rd
@@ -22,7 +22,7 @@ write_parquet(
 \arguments{
 \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}}
 
-\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file
+\item{sink}{A string file path, connection, URI, or \link{OutputStream}, or 
path in a file
 system (\code{SubTreeFileSystem})}
 
 \item{chunk_size}{how many rows of data to write to disk at once. This
diff --git a/r/src/io.cpp b/r/src/io.cpp
index 4d5ee31794..2f36d51dcd 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -212,11 +212,16 @@ void io___BufferOutputStream__Write(
 class RConnectionFileInterface : public virtual arrow::io::FileInterface {
  public:
   explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
-      : connection_sexp_(connection_sexp), closed_(false) {
+      : connection_sexp_(connection_sexp),
+        closed_(false),
+        seekable_(false),
+        bytes_written_(0),
+        bytes_read_(0) {
     check_closed();
+    seekable_ = check_seekable();
   }
 
-  arrow::Status Close() {
+  arrow::Status Close() override {
     if (closed_) {
       return arrow::Status::OK();
     }
@@ -227,11 +232,21 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
                              "close() on R connection");
   }
 
-  arrow::Result<int64_t> Tell() const {
+  arrow::Result<int64_t> Tell() const override {
     if (closed()) {
       return arrow::Status::IOError("R connection is closed");
     }
 
+    // R connections use seek() with no additional arguments as a tell()
+    // implementation; however, non-seekable connections will error if you
+    // do this. This heuristic allows Tell() to return a reasonable value
+    // (used by at least the IPC writer).
+    if (!seekable_ && bytes_written_ > 0) {
+      return bytes_written_;
+    } else if (!seekable_) {
+      return bytes_read_;
+    }
+
     return SafeCallIntoR<int64_t>(
         [&]() {
           cpp11::sexp result = 
cpp11::package("base")["seek"](connection_sexp_);
@@ -240,7 +255,7 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
         "tell() on R connection");
   }
 
-  bool closed() const { return closed_; }
+  bool closed() const override { return closed_; }
 
  protected:
   cpp11::sexp connection_sexp_;
@@ -261,13 +276,14 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
     return SafeCallIntoR<int64_t>(
         [&] {
           cpp11::function read_bin = cpp11::package("base")["readBin"];
-          cpp11::writable::raws ptype((R_xlen_t)0);
+          cpp11::writable::raws ptype(static_cast<R_xlen_t>(0));
           cpp11::integers n = cpp11::as_sexp<int>(static_cast<int>(nbytes));
 
           cpp11::sexp result = read_bin(connection_sexp_, ptype, n);
 
           int64_t result_size = cpp11::safe[Rf_xlength](result);
           memcpy(out, cpp11::safe[RAW](result), result_size);
+          bytes_read_++;
           return result_size;
         },
         "readBin() on R connection");
@@ -294,6 +310,7 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
 
           cpp11::function write_bin = cpp11::package("base")["writeBin"];
           write_bin(data_raw, connection_sexp_);
+          bytes_written_ += nbytes;
         },
         "writeBin() on R connection");
   }
@@ -312,6 +329,9 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
 
  private:
   bool closed_;
+  bool seekable_;
+  int64_t bytes_written_;
+  int64_t bytes_read_;
 
   bool check_closed() {
     if (closed_) {
@@ -333,6 +353,15 @@ class RConnectionFileInterface : public virtual 
arrow::io::FileInterface {
 
     return closed_;
   }
+
+  bool check_seekable() {
+    auto is_seekable_result = SafeCallIntoR<bool>([&] {
+      cpp11::sexp result = 
cpp11::package("base")["isSeekable"](connection_sexp_);
+      return cpp11::as_cpp<bool>(result);
+    });
+
+    return is_seekable_result.ok() && *is_seekable_result;
+  }
 };
 
 class RConnectionInputStream : public virtual arrow::io::InputStream,
@@ -341,9 +370,11 @@ class RConnectionInputStream : public virtual 
arrow::io::InputStream,
   explicit RConnectionInputStream(cpp11::sexp connection_sexp)
       : RConnectionFileInterface(connection_sexp) {}
 
-  arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return 
ReadBase(nbytes, out); }
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    return ReadBase(nbytes, out);
+  }
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
     return ReadBase(nbytes);
   }
 };
@@ -373,13 +404,15 @@ class RConnectionRandomAccessFile : public 
arrow::io::RandomAccessFile,
     }
   }
 
-  arrow::Result<int64_t> GetSize() { return size_; }
+  arrow::Result<int64_t> GetSize() override { return size_; }
 
-  arrow::Status Seek(int64_t pos) { return SeekBase(pos); }
+  arrow::Status Seek(int64_t pos) override { return SeekBase(pos); }
 
-  arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return 
ReadBase(nbytes, out); }
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    return ReadBase(nbytes, out);
+  }
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
     return ReadBase(nbytes);
   }
 
@@ -393,7 +426,7 @@ class RConnectionOutputStream : public 
arrow::io::OutputStream,
   explicit RConnectionOutputStream(cpp11::sexp connection_sexp)
       : RConnectionFileInterface(connection_sexp) {}
 
-  arrow::Status Write(const void* data, int64_t nbytes) {
+  arrow::Status Write(const void* data, int64_t nbytes) override {
     return WriteBase(data, nbytes);
   }
 };

Reply via email to