paleolimbot commented on code in PR #12323:
URL: https://github.com/apache/arrow/pull/12323#discussion_r848520733


##########
r/src/io.cpp:
##########
@@ -207,7 +209,209 @@ void io___BufferOutputStream__Write(
   StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
 }
 
-// TransformInputStream::TransformFunc wrapper
+// ------ RConnectionInputStream / RConnectionOutputStream
+
+class RConnectionFileInterface : public virtual arrow::io::FileInterface {
+ public:
+  explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
+      : connection_sexp_(connection_sexp), closed_(false) {
+    check_closed();
+  }
+
+  arrow::Status Close() {
+    if (closed_) {
+      return arrow::Status::OK();
+    }
+
+    auto result = SafeCallIntoR<bool>([&]() {
+      cpp11::package("base")["close"](connection_sexp_);
+      return true;
+    });
+
+    RETURN_NOT_OK(result);
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> Tell() const {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_);
+    return cpp11::as_cpp<int64_t>(result);
+  }
+
+  bool closed() const { return closed_; }
+
+ protected:
+  cpp11::sexp connection_sexp_;
+
+  // Define the logic here because multiple inheritance makes it difficult
+  // for this base class, the InputStream and the RandomAccessFile
+  // interfaces to co-exist.
+  arrow::Result<int64_t> ReadBase(int64_t nbytes, void* out) {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    return SafeCallIntoR<int64_t>([&] {

Review Comment:
   I spent a bit of time trying this but I don't feel that I know how to do it 
safely...for `CloseAsync()` it feels like we run the risk of the event loop 
ending before the file is actually closed; for `ReadAsync()` feels like it's 
better suited to something like "open a new connection, read from it, and close 
it" and I end up with nested futures that feel wrong to me. I'm happy to take 
another try at this if you think it's important for this usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to