pitrou commented on code in PR #13635:
URL: https://github.com/apache/arrow/pull/13635#discussion_r926822836
##########
r/src/safe-call-into-r.h:
##########
@@ -41,111 +49,188 @@
// SafeCallIntoR<cpp_type>([&]() { ... }).
class MainRThread {
public:
- MainRThread() : initialized_(false), executor_(nullptr) {}
+ MainRThread() : initialized_(false), executor_(nullptr),
stop_source_(nullptr) {}
// Call this method from the R thread (e.g., on package load)
// to save an internal copy of the thread id.
void Initialize() {
thread_id_ = std::this_thread::get_id();
initialized_ = true;
- SetError(R_NilValue);
+ ResetError();
+ arrow::ResetSignalStopSource();
+ stop_source_ = arrow::ValueOrStop(arrow::SetSignalStopSource());
}
bool IsInitialized() { return initialized_; }
// Check if the current thread is the main R thread
bool IsMainThread() { return initialized_ && std::this_thread::get_id() ==
thread_id_; }
+ arrow::StopToken GetStopToken() {
+ if (SignalStopSourceEnabled()) {
+ return stop_source_->token();
+ } else {
+ return arrow::StopToken::Unstoppable();
+ }
+ }
+
+ bool SignalStopSourceEnabled() { return stop_source_ != nullptr; }
+
+ // Check if a SafeCallIntoR call is able to execute
+ bool CanExecuteSafeCallIntoR() { return IsMainThread() || executor_ !=
nullptr; }
+
// The Executor that is running on the main R thread, if it exists
arrow::internal::Executor*& Executor() { return executor_; }
- // Save an error token generated from a cpp11::unwind_exception
- // so that it can be properly handled after some cleanup code
- // has run (e.g., cancelling some futures or waiting for them
- // to finish).
- void SetError(cpp11::sexp token) { error_token_ = token; }
+ // Save an error (possibly with an error token generated from
+ // a cpp11::unwind_exception) so that it can be properly handled
+ // after some cleanup code has run (e.g., cancelling some futures
+ // or waiting for them to finish).
+ void SetError(arrow::Status status) { status_ = status; }
- void ResetError() { error_token_ = R_NilValue; }
+ void ResetError() { status_ = arrow::Status::OK(); }
// Check if there is a saved error
- bool HasError() { return error_token_ != R_NilValue; }
+ bool HasError() { return !status_.ok(); }
- // Throw a cpp11::unwind_exception() with the saved token if it exists
+ // Throw an exception if there was an error executing on the main
+ // thread.
void ClearError() {
- if (HasError()) {
- cpp11::unwind_exception e(error_token_);
- ResetError();
- throw e;
+ if (SignalStopSourceEnabled()) {
+ stop_source_->Reset();
Review Comment:
Ah, right, the StopToken is persisted on the IOContext... hmm, that's a more
general usability issue that I hadn't though about :-/
Can you perhaps open a JIRA about this problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]