pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r642890540



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)

Review comment:
       `nullptr`

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might 
schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool 
from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool 
from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a 
chance

Review comment:
       Why "a copy"? It's not clear to me where a copy is being made.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, 
FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != 
FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run 
synchronously

Review comment:
       "Describe"

##########
File path: cpp/src/arrow/util/test_common.h
##########
@@ -85,4 +88,18 @@ inline void AssertIteratorExhausted(Iterator<T>& it) {
 
 Transformer<TestInt, TestStr> MakeFilter(std::function<bool(TestInt&)> filter);
 
+class MockExecutor : public internal::Executor {

Review comment:
       Ideally, we should start categorizing our test utilities better instead 
of dumping them all in generic header files.

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might 
schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool 
from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool 
from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a 
chance
+      // to be scheduled.
+      struct CallbackTask {
+        void operator()() { std::move(callback)(*self); }
+
+        Callback callback;
+        std::shared_ptr<FutureImpl> self;

Review comment:
       The code used to be capturing a weak pointer. Is it desirable to make 
this a strong pointer instead?

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might 
schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool 
from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool 
from_unfinished) {

Review comment:
       The coding conventions prohibit passing mutable lrefs. You could make 
this a `CallbackRecord&&`, for example.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = 
std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());

Review comment:
       Unfortunately, this doesn't check which of the two callbacks was spawned.

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might 
schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};

Review comment:
       Why doesn't `AddCallback` directly take a `CallbackRecord`?
   

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, 
FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != 
FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run 
synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,

Review comment:
       We should avoid using ALL_CAPS names, because of potential clashes with 
macros (this is a common issue with Windows headers, unfortunately).

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       Shouldn't `from_unfinished` be false?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = 
std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;

Review comment:
       NEVER is never tested?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -256,6 +258,42 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+class TransferTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = 
std::make_shared<MockExecutor>();
+};
+
+TEST_F(TransferTest, DefaultTransferIfNotFinished) {
+  {
+    Future<> fut = Future<>::Make();
+    auto transferred = executor()->Transfer(fut);
+    fut.MarkFinished();
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut);
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+}
+
+TEST_F(TransferTest, TransferAlways) {
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut, /*always_transfer=*/true);

Review comment:
       Nit, but it would probably be nicer to be able to spell this as 
`TransferAlways(fut)`.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename 
std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at 
least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); 
}
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        
std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not 
Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); 
}
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       Can you make the `static_cast<>` dance a method on `FutureImpl`?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = 
std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+}
+
+class DelayedExecutor : public internal::Executor {

Review comment:
       It's a bit weird to have this in a private test file, and the mock 
executor in a `.h`.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, 
FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != 
FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run 
synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,
+  /// Schedule a new task only if the future is not finished when the
+  /// callback is added
+  IF_UNFINISHED = 1,
+  /// Always schedule the callback as a new task
+  ALWAYS = 2
+};
+
+/// \brief Options that control how a continuation is run
+struct CallbackOptions {
+  /// Describes whether the callback should be run synchronously or scheduled

Review comment:
       "Describe"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to