pitrou commented on code in PR #13635:
URL: https://github.com/apache/arrow/pull/13635#discussion_r926664323


##########
r/_pkgdown.yml:
##########
@@ -139,6 +139,7 @@ reference:
       - write_to_raw
       - write_parquet
       - write_csv_arrow
+      - arrow_cancellable

Review Comment:
   Should this change be reverted?



##########
r/src/safe-call-into-r.h:
##########
@@ -41,111 +49,188 @@
 // SafeCallIntoR<cpp_type>([&]() { ... }).
 class MainRThread {
  public:
-  MainRThread() : initialized_(false), executor_(nullptr) {}
+  MainRThread() : initialized_(false), executor_(nullptr), 
stop_source_(nullptr) {}

Review Comment:
   For the record, if this is really meant to be a singleton, perhaps you want 
to make the constructor private to avoid potential misuse?
   
   A typical idiom is to expose a static method to get the singleton, e.g.:
   ```c++
   class MainRThread {
    public:
     static MainRThread* GetInstance();
   
    private:
     MainRThread() : ...
   ```



##########
r/src/safe-call-into-r.h:
##########
@@ -41,111 +49,188 @@
 // SafeCallIntoR<cpp_type>([&]() { ... }).
 class MainRThread {
  public:
-  MainRThread() : initialized_(false), executor_(nullptr) {}
+  MainRThread() : initialized_(false), executor_(nullptr), 
stop_source_(nullptr) {}
 
   // Call this method from the R thread (e.g., on package load)
   // to save an internal copy of the thread id.
   void Initialize() {
     thread_id_ = std::this_thread::get_id();
     initialized_ = true;
-    SetError(R_NilValue);
+    ResetError();
+    arrow::ResetSignalStopSource();
+    stop_source_ = arrow::ValueOrStop(arrow::SetSignalStopSource());
   }
 
   bool IsInitialized() { return initialized_; }
 
   // Check if the current thread is the main R thread
   bool IsMainThread() { return initialized_ && std::this_thread::get_id() == 
thread_id_; }
 
+  arrow::StopToken GetStopToken() {
+    if (SignalStopSourceEnabled()) {
+      return stop_source_->token();
+    } else {
+      return arrow::StopToken::Unstoppable();
+    }
+  }
+
+  bool SignalStopSourceEnabled() { return stop_source_ != nullptr; }
+
+  // Check if a SafeCallIntoR call is able to execute
+  bool CanExecuteSafeCallIntoR() { return IsMainThread() || executor_ != 
nullptr; }
+
   // The Executor that is running on the main R thread, if it exists
   arrow::internal::Executor*& Executor() { return executor_; }
 
-  // Save an error token generated from a cpp11::unwind_exception
-  // so that it can be properly handled after some cleanup code
-  // has run (e.g., cancelling some futures or waiting for them
-  // to finish).
-  void SetError(cpp11::sexp token) { error_token_ = token; }
+  // Save an error (possibly with an error token generated from
+  // a cpp11::unwind_exception) so that it can be properly handled
+  // after some cleanup code  has run (e.g., cancelling some futures
+  // or waiting for them to finish).
+  void SetError(arrow::Status status) { status_ = status; }
 
-  void ResetError() { error_token_ = R_NilValue; }
+  void ResetError() { status_ = arrow::Status::OK(); }
 
   // Check if there is a saved error
-  bool HasError() { return error_token_ != R_NilValue; }
+  bool HasError() { return !status_.ok(); }
 
-  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  // Throw an exception if there was an error executing on the main
+  // thread.
   void ClearError() {
-    if (HasError()) {
-      cpp11::unwind_exception e(error_token_);
-      ResetError();
-      throw e;
+    if (SignalStopSourceEnabled()) {
+      stop_source_->Reset();

Review Comment:
   This shouldn't be needed. Instead, `ResetSignalStopSource` will trigger 
creation of a new stop source the next time `SetSignalStopSource` is called.



##########
r/src/safe-call-into-r.h:
##########
@@ -41,111 +49,188 @@
 // SafeCallIntoR<cpp_type>([&]() { ... }).
 class MainRThread {
  public:
-  MainRThread() : initialized_(false), executor_(nullptr) {}
+  MainRThread() : initialized_(false), executor_(nullptr), 
stop_source_(nullptr) {}
 
   // Call this method from the R thread (e.g., on package load)
   // to save an internal copy of the thread id.
   void Initialize() {
     thread_id_ = std::this_thread::get_id();
     initialized_ = true;
-    SetError(R_NilValue);
+    ResetError();
+    arrow::ResetSignalStopSource();
+    stop_source_ = arrow::ValueOrStop(arrow::SetSignalStopSource());
   }
 
   bool IsInitialized() { return initialized_; }
 
   // Check if the current thread is the main R thread
   bool IsMainThread() { return initialized_ && std::this_thread::get_id() == 
thread_id_; }
 
+  arrow::StopToken GetStopToken() {
+    if (SignalStopSourceEnabled()) {
+      return stop_source_->token();
+    } else {
+      return arrow::StopToken::Unstoppable();
+    }
+  }
+
+  bool SignalStopSourceEnabled() { return stop_source_ != nullptr; }
+
+  // Check if a SafeCallIntoR call is able to execute
+  bool CanExecuteSafeCallIntoR() { return IsMainThread() || executor_ != 
nullptr; }
+
   // The Executor that is running on the main R thread, if it exists
   arrow::internal::Executor*& Executor() { return executor_; }
 
-  // Save an error token generated from a cpp11::unwind_exception
-  // so that it can be properly handled after some cleanup code
-  // has run (e.g., cancelling some futures or waiting for them
-  // to finish).
-  void SetError(cpp11::sexp token) { error_token_ = token; }
+  // Save an error (possibly with an error token generated from
+  // a cpp11::unwind_exception) so that it can be properly handled
+  // after some cleanup code  has run (e.g., cancelling some futures
+  // or waiting for them to finish).
+  void SetError(arrow::Status status) { status_ = status; }
 
-  void ResetError() { error_token_ = R_NilValue; }
+  void ResetError() { status_ = arrow::Status::OK(); }
 
   // Check if there is a saved error
-  bool HasError() { return error_token_ != R_NilValue; }
+  bool HasError() { return !status_.ok(); }
 
-  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  // Throw an exception if there was an error executing on the main
+  // thread.
   void ClearError() {

Review Comment:
   Nit, but `ClearError` can be a bit misleading, perhaps name this 
`ReraiseError` or something?



##########
r/src/safe-call-into-r.h:
##########
@@ -41,111 +49,188 @@
 // SafeCallIntoR<cpp_type>([&]() { ... }).
 class MainRThread {
  public:
-  MainRThread() : initialized_(false), executor_(nullptr) {}
+  MainRThread() : initialized_(false), executor_(nullptr), 
stop_source_(nullptr) {}
 
   // Call this method from the R thread (e.g., on package load)
   // to save an internal copy of the thread id.
   void Initialize() {
     thread_id_ = std::this_thread::get_id();
     initialized_ = true;
-    SetError(R_NilValue);
+    ResetError();
+    arrow::ResetSignalStopSource();

Review Comment:
   Hmm, this doesn't seem right (though perhaps it works by chance). As the C++ 
docstring says:
   ```
   /// The only allowed order of calls is the following:
   /// - SetSignalStopSource()
   /// - any number of pairs of (RegisterCancellingSignalHandler,
   ///   UnregisterCancellingSignalHandler) calls
   /// - ResetSignalStopSource()
   ```



##########
r/src/safe-call-into-r.h:
##########
@@ -158,7 +243,39 @@ arrow::Result<T> 
RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_c
   GetMainRThread().ClearError();
 
   return result;
-#endif
+}
+
+// Performs an Arrow call (e.g., run an exec plan) in such a way that 
background threads
+// can use SafeCallIntoR(). This version is useful for Arrow calls that do not 
already
+// return a Future<>(). If it is not possible to use RunWithCapturedR() (i.e.,
+// CanRunWithCapturedR() returns false), this will run make_arrow_call on the 
main
+// R thread (which will cause background threads that try to SafeCallIntoR() to
+// error).
+template <typename T>
+arrow::Result<T> RunWithCapturedRIfPossible(
+    std::function<arrow::Result<T>()> make_arrow_call) {
+  if (CanRunWithCapturedR()) {
+    // Note that the use of the io_context here is arbitrary (i.e. we could use
+    // any construct that launches a background thread).
+    const auto& io_context = arrow::io::default_io_context();
+    return RunWithCapturedR<T>([&]() {
+      return 
DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call)));
+    });
+  } else {
+    return make_arrow_call();
+  }
+}
+
+// Like RunWithCapturedRIfPossible<>() but for arrow calls that don't return
+// a Result.
+static inline arrow::Status RunWithCapturedRIfPossibleVoid(
+    std::function<arrow::Status()> make_arrow_call) {
+  auto result = RunWithCapturedRIfPossible<bool>([&]() -> arrow::Result<bool> {
+    ARROW_RETURN_NOT_OK(make_arrow_call());
+    return true;
+  });
+  ARROW_RETURN_NOT_OK(result);
+  return arrow::Status::OK();

Review Comment:
   You may prefer this explicit form or you might like a terser variant:
   ```suggestion
     return result.status();
   ```



##########
r/src/filesystem.cpp:
##########
@@ -239,7 +240,8 @@ std::string fs___FileSystem__type_name(
 // [[arrow::export]]
 std::shared_ptr<fs::LocalFileSystem> fs___LocalFileSystem__create() {
   // Affects OpenInputFile/OpenInputStream
-  auto io_context = arrow::io::IOContext(gc_memory_pool());
+  auto io_context =
+      arrow::io::IOContext(gc_memory_pool(), GetMainRThread().GetStopToken());

Review Comment:
   Perhaps you want to factor out this R-specific IOContext creation, since it 
happens in many places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to