This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ae8dc747c ARROW-17087: [C++] Race condition in scanner test (#13651)
1ae8dc747c is described below

commit 1ae8dc747c6835069c652874124dae07c50a3bf4
Author: Weston Pace <[email protected]>
AuthorDate: Thu Jul 21 00:00:08 2022 -1000

    ARROW-17087: [C++] Race condition in scanner test (#13651)
    
    The ExecPlan::AddTask variant that takes a future was problematic because a 
future is not created until a task has already been started and, if the task 
group was already ended, it wasn't tracking the future.  So work would be 
started that would not be tracked anywhere and the plan was ending prematurely. 
 As a fix I changed that AddTask variant to BeginExternalTask.  This method 
will request permission to start the task before the future is created.  At 
some point the scanner should  [...]
    
    Lead-authored-by: Weston Pace <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/compute/exec/exec_plan.cc   |  26 +++-
 cpp/src/arrow/compute/exec/exec_plan.h    |  18 +--
 cpp/src/arrow/compute/exec/source_node.cc |  23 +--
 cpp/src/arrow/util/async_util.cc          |  14 +-
 cpp/src/arrow/util/async_util.h           |  12 +-
 cpp/src/arrow/util/future.cc              | 215 +-------------------------
 cpp/src/arrow/util/future.h               | 118 +--------------
 cpp/src/arrow/util/future_iterator.h      |  75 ---------
 cpp/src/arrow/util/future_test.cc         | 242 +++---------------------------
 cpp/src/arrow/util/thread_pool_test.cc    |   2 +-
 10 files changed, 83 insertions(+), 662 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc 
b/cpp/src/arrow/compute/exec/exec_plan.cc
index e248782ded..a7cd3472be 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -74,15 +74,26 @@ struct ExecPlanImpl : public ExecPlan {
     return nodes_.back().get();
   }
 
-  Status AddFuture(Future<> fut) { return 
task_group_.AddTaskIfNotEnded(std::move(fut)); }
+  Result<Future<>> BeginExternalTask() {
+    Future<> completion_future = Future<>::Make();
+    ARROW_ASSIGN_OR_RAISE(bool task_added,
+                          task_group_.AddTaskIfNotEnded(completion_future));
+    if (task_added) {
+      return std::move(completion_future);
+    }
+    // Return an invalid future if we were already finished to signal to the
+    // caller that they should not begin the task
+    return Future<>{};
+  }
 
   Status ScheduleTask(std::function<Status()> fn) {
     auto executor = exec_context_->executor();
     if (!executor) return fn();
-    // Atomically submit fn to the executor, and if successful
-    // add it to the task group.
-    return task_group_.AddTaskIfNotEnded(
-        [executor, fn]() { return executor->Submit(std::move(fn)); });
+    // Adds a task which submits fn to the executor and tracks its progress.  
If we're
+    // already stopping then the task is ignored and fn is not executed.
+    return task_group_
+        .AddTaskIfNotEnded([executor, fn]() { return 
executor->Submit(std::move(fn)); })
+        .status();
   }
 
   Status ScheduleTask(std::function<Status(size_t)> fn) {
@@ -355,9 +366,10 @@ const ExecPlan::NodeVector& ExecPlan::sinks() const { 
return ToDerived(this)->si
 size_t ExecPlan::GetThreadIndex() { return ToDerived(this)->GetThreadIndex(); }
 size_t ExecPlan::max_concurrency() const { return 
ToDerived(this)->max_concurrency(); }
 
-Status ExecPlan::AddFuture(Future<> fut) {
-  return ToDerived(this)->AddFuture(std::move(fut));
+Result<Future<>> ExecPlan::BeginExternalTask() {
+  return ToDerived(this)->BeginExternalTask();
 }
+
 Status ExecPlan::ScheduleTask(std::function<Status()> fn) {
   return ToDerived(this)->ScheduleTask(std::move(fn));
 }
diff --git a/cpp/src/arrow/compute/exec/exec_plan.h 
b/cpp/src/arrow/compute/exec/exec_plan.h
index c8599748de..d2663972f2 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.h
+++ b/cpp/src/arrow/compute/exec/exec_plan.h
@@ -69,17 +69,17 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
   /// e.g. make an array of thread-locals off this.
   size_t max_concurrency() const;
 
-  /// \brief Add a future to the plan's task group.
+  /// \brief Start an external task
   ///
-  /// \param fut The future to add
+  /// This should be avoided if possible.  It is kept in for now for legacy
+  /// purposes.  This should be called before the external task is started.  If
+  /// a valid future is returned then it should be marked complete when the
+  /// external task has finished.
   ///
-  /// Use this when interfacing with anything that returns a future (such as 
IO), but
-  /// prefer ScheduleTask/StartTaskGroup inside of ExecNodes.
-  /// The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
-  /// should be added sparingly! Prefer just doing the work immediately rather 
than adding
-  /// a task for it. Tasks are used in pipeline breakers that may output many 
more rows
-  /// than they received (such as a full outer join).
-  Status AddFuture(Future<> fut);
+  /// \return an invalid future if the plan has already ended, otherwise this
+  ///         returns a future that must be completed when the external task
+  ///         finishes.
+  Result<Future<>> BeginExternalTask();
 
   /// \brief Add a single function as a task to the plan's task group.
   ///
diff --git a/cpp/src/arrow/compute/exec/source_node.cc 
b/cpp/src/arrow/compute/exec/source_node.cc
index 33072f0026..677659c815 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -85,6 +85,7 @@ struct SourceNode : ExecNode {
       if (stop_requested_) {
         return Status::OK();
       }
+      started_ = true;
     }
 
     CallbackOptions options;
@@ -97,7 +98,12 @@ struct SourceNode : ExecNode {
       options.executor = executor;
       options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
-    started_ = true;
+    ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->BeginExternalTask());
+    if (!scan_task.is_valid()) {
+      finished_.MarkFinished();
+      // Plan has already been aborted, no need to start scanning
+      return Status::OK();
+    }
     auto fut = Loop([this, options] {
                  std::unique_lock<std::mutex> lock(mutex_);
                  int total_batches = batch_count_++;
@@ -111,7 +117,6 @@ struct SourceNode : ExecNode {
                          -> Future<ControlFlow<int>> {
                        std::unique_lock<std::mutex> lock(mutex_);
                        if (IsIterationEnd(maybe_batch) || stop_requested_) {
-                         stop_requested_ = true;
                          return Break(total_batches);
                        }
                        lock.unlock();
@@ -129,23 +134,19 @@ struct SourceNode : ExecNode {
                        return 
Future<ControlFlow<int>>::MakeFinished(Continue());
                      },
                      [=](const Status& error) -> ControlFlow<int> {
-                       std::unique_lock<std::mutex> lock(mutex_);
-                       stop_requested_ = true;
-                       lock.unlock();
                        outputs_[0]->ErrorReceived(this, error);
-                       finished_.MarkFinished(error);
                        return Break(total_batches);
                      },
                      options);
                })
                    .Then(
-                       [=](int total_batches) {
+                       [this, scan_task](int total_batches) mutable {
                          outputs_[0]->InputFinished(this, total_batches);
-                         if (!finished_.is_finished()) 
finished_.MarkFinished();
+                         scan_task.MarkFinished();
+                         finished_.MarkFinished();
                        },
                        {}, options);
     if (!executor && finished_.is_finished()) return finished_.status();
-    RETURN_NOT_OK(plan_->AddFuture(fut));
     return Status::OK();
   }
 
@@ -186,7 +187,9 @@ struct SourceNode : ExecNode {
   void StopProducing() override {
     std::unique_lock<std::mutex> lock(mutex_);
     stop_requested_ = true;
-    if (!started_) finished_.MarkFinished();
+    if (!started_) {
+      finished_.MarkFinished();
+    }
   }
 
  private:
diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc
index 31c4ef5892..d8ae2e1923 100644
--- a/cpp/src/arrow/util/async_util.cc
+++ b/cpp/src/arrow/util/async_util.cc
@@ -57,10 +57,10 @@ Status 
AsyncTaskGroup::AddTask(std::function<Result<Future<>>()> task) {
   return AddTaskUnlocked(*maybe_task_fut, std::move(guard));
 }
 
-Status AsyncTaskGroup::AddTaskIfNotEnded(std::function<Result<Future<>>()> 
task) {
+Result<bool> 
AsyncTaskGroup::AddTaskIfNotEnded(std::function<Result<Future<>>()> task) {
   auto guard = mutex_.Lock();
   if (finished_adding_) {
-    return Status::OK();
+    return false;
   }
   if (!err_.ok()) {
     return err_;
@@ -70,7 +70,8 @@ Status 
AsyncTaskGroup::AddTaskIfNotEnded(std::function<Result<Future<>>()> task)
     err_ = maybe_task_fut.status();
     return err_;
   }
-  return AddTaskUnlocked(*maybe_task_fut, std::move(guard));
+  ARROW_RETURN_NOT_OK(AddTaskUnlocked(*maybe_task_fut, std::move(guard)));
+  return true;
 }
 
 Status AsyncTaskGroup::AddTaskUnlocked(const Future<>& task_fut,
@@ -105,15 +106,16 @@ Status AsyncTaskGroup::AddTask(const Future<>& task_fut) {
   return AddTaskUnlocked(task_fut, std::move(guard));
 }
 
-Status AsyncTaskGroup::AddTaskIfNotEnded(const Future<>& task_fut) {
+Result<bool> AsyncTaskGroup::AddTaskIfNotEnded(const Future<>& task_fut) {
   auto guard = mutex_.Lock();
   if (finished_adding_) {
-    return Status::OK();
+    return false;
   }
   if (!err_.ok()) {
     return err_;
   }
-  return AddTaskUnlocked(task_fut, std::move(guard));
+  ARROW_RETURN_NOT_OK(AddTaskUnlocked(task_fut, std::move(guard)));
+  return true;
 }
 
 Future<> AsyncTaskGroup::End() {
diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h
index e7901aad1c..b3ff682996 100644
--- a/cpp/src/arrow/util/async_util.h
+++ b/cpp/src/arrow/util/async_util.h
@@ -117,11 +117,17 @@ class ARROW_EXPORT AsyncTaskGroup {
   /// completed then adding a task will fail.
   Status AddTask(std::function<Result<Future<>>()> task);
   /// Same as AddTask but doesn't add the task if End() has been called.
-  Status AddTaskIfNotEnded(std::function<Result<Future<>>()> task);
+  ///
+  /// \return true if the task was started, false if the group had already 
ended
+  Result<bool> AddTaskIfNotEnded(std::function<Result<Future<>>()> task);
   /// Add a task that has already been started
   Status AddTask(const Future<>& task);
-  /// Same as AddTask but doesn't add the task if End() has been called.
-  Status AddTaskIfNotEnded(const Future<>& task);
+  /// \brief Attempt to add a task that has already been started to this 
group's tracking
+  ///
+  /// The return value must be paid attention to.  If the return value is 
false then the
+  /// task could not be added because the group had already ended and so the 
caller must
+  /// track the external task some other way.
+  Result<bool> AddTaskIfNotEnded(const Future<>& task);
   /// Signal that top level tasks are done being added
   ///
   /// It is allowed for tasks to be added after this call provided the future 
has not yet
diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc
index ab59234dea..14e8b6eaa3 100644
--- a/cpp/src/arrow/util/future.cc
+++ b/cpp/src/arrow/util/future.cc
@@ -33,202 +33,8 @@ namespace arrow {
 
 using internal::checked_cast;
 
-// Shared mutex for all FutureWaiter instances.
-// This simplifies lock management compared to a per-waiter mutex.
-// The locking order is: global waiter mutex, then per-future mutex.
-//
-// It is unlikely that many waiter instances are alive at once, so this
-// should ideally not limit scalability.
-static std::mutex global_waiter_mutex;
-
-const double FutureWaiter::kInfinity = HUGE_VAL;
-
-class FutureWaiterImpl : public FutureWaiter {
- public:
-  FutureWaiterImpl(Kind kind, std::vector<FutureImpl*> futures)
-      : signalled_(false),
-        kind_(kind),
-        futures_(std::move(futures)),
-        one_failed_(-1),
-        fetch_pos_(0) {
-    finished_futures_.reserve(futures_.size());
-
-    // Observe the current state of futures and add waiters to receive future
-    // state changes, atomically per future.
-    // We need to lock ourselves, because as soon as SetWaiter() is called,
-    // a FutureImpl may call MarkFutureFinished() from another thread
-    // before this constructor finishes.
-    std::unique_lock<std::mutex> lock(global_waiter_mutex);
-
-    for (int i = 0; i < static_cast<int>(futures_.size()); ++i) {
-      const auto state = futures_[i]->SetWaiter(this, i);
-      if (IsFutureFinished(state)) {
-        finished_futures_.push_back(i);
-      }
-      if (state != FutureState::SUCCESS) {
-        one_failed_ = i;
-      }
-    }
-
-    // Maybe signal the waiter, if the ending condition is already satisfied
-    if (ShouldSignal()) {
-      // No need to notify non-existent Wait() calls
-      signalled_ = true;
-    }
-  }
-
-  ~FutureWaiterImpl() override {
-    for (auto future : futures_) {
-      future->RemoveWaiter(this);
-    }
-  }
-
-  // Is the ending condition satisfied?
-  bool ShouldSignal() {
-    bool do_signal = false;
-    switch (kind_) {
-      case ANY:
-        do_signal = (finished_futures_.size() > 0);
-        break;
-      case ALL:
-        do_signal = (finished_futures_.size() == futures_.size());
-        break;
-      case ALL_OR_FIRST_FAILED:
-        do_signal = (finished_futures_.size() == futures_.size()) || 
one_failed_ >= 0;
-        break;
-      case ITERATE:
-        do_signal = (finished_futures_.size() > 
static_cast<size_t>(fetch_pos_));
-        break;
-    }
-    return do_signal;
-  }
-
-  void Signal() {
-    signalled_ = true;
-    cv_.notify_one();
-  }
-
-  void DoWaitUnlocked(std::unique_lock<std::mutex>* lock) {
-    cv_.wait(*lock, [this] { return signalled_.load(); });
-  }
-
-  bool DoWait() {
-    if (signalled_) {
-      return true;
-    }
-    std::unique_lock<std::mutex> lock(global_waiter_mutex);
-    DoWaitUnlocked(&lock);
-    return true;
-  }
-
-  template <class Rep, class Period>
-  bool DoWait(const std::chrono::duration<Rep, Period>& duration) {
-    if (signalled_) {
-      return true;
-    }
-    std::unique_lock<std::mutex> lock(global_waiter_mutex);
-    cv_.wait_for(lock, duration, [this] { return signalled_.load(); });
-    return signalled_.load();
-  }
-
-  void DoMarkFutureFinishedUnlocked(int future_num, FutureState state) {
-    finished_futures_.push_back(future_num);
-    if (state != FutureState::SUCCESS) {
-      one_failed_ = future_num;
-    }
-    if (!signalled_ && ShouldSignal()) {
-      Signal();
-    }
-  }
-
-  int DoWaitAndFetchOne() {
-    std::unique_lock<std::mutex> lock(global_waiter_mutex);
-
-    DCHECK_EQ(kind_, ITERATE);
-    DoWaitUnlocked(&lock);
-    DCHECK_LT(static_cast<size_t>(fetch_pos_), finished_futures_.size());
-    if (static_cast<size_t>(fetch_pos_) == finished_futures_.size() - 1) {
-      signalled_ = false;
-    }
-    return finished_futures_[fetch_pos_++];
-  }
-
-  std::vector<int> DoMoveFinishedFutures() {
-    std::unique_lock<std::mutex> lock(global_waiter_mutex);
-
-    return std::move(finished_futures_);
-  }
-
- protected:
-  std::condition_variable cv_;
-  std::atomic<bool> signalled_;
-
-  Kind kind_;
-  std::vector<FutureImpl*> futures_;
-  std::vector<int> finished_futures_;
-  int one_failed_;
-  int fetch_pos_;
-};
-
-namespace {
-
-FutureWaiterImpl* GetConcreteWaiter(FutureWaiter* waiter) {
-  return checked_cast<FutureWaiterImpl*>(waiter);
-}
-
-}  // namespace
-
-FutureWaiter::FutureWaiter() = default;
-
-FutureWaiter::~FutureWaiter() = default;
-
-std::unique_ptr<FutureWaiter> FutureWaiter::Make(Kind kind,
-                                                 std::vector<FutureImpl*> 
futures) {
-  return std::unique_ptr<FutureWaiter>(new FutureWaiterImpl(kind, 
std::move(futures)));
-}
-
-void FutureWaiter::MarkFutureFinishedUnlocked(int future_num, FutureState 
state) {
-  // Called by FutureImpl on state changes
-  GetConcreteWaiter(this)->DoMarkFutureFinishedUnlocked(future_num, state);
-}
-
-bool FutureWaiter::Wait(double seconds) {
-  if (seconds == kInfinity) {
-    return GetConcreteWaiter(this)->DoWait();
-  } else {
-    return 
GetConcreteWaiter(this)->DoWait(std::chrono::duration<double>(seconds));
-  }
-}
-
-int FutureWaiter::WaitAndFetchOne() {
-  return GetConcreteWaiter(this)->DoWaitAndFetchOne();
-}
-
-std::vector<int> FutureWaiter::MoveFinishedFutures() {
-  return GetConcreteWaiter(this)->DoMoveFinishedFutures();
-}
-
 class ConcreteFutureImpl : public FutureImpl {
  public:
-  FutureState DoSetWaiter(FutureWaiter* w, int future_num) {
-    std::unique_lock<std::mutex> lock(mutex_);
-
-    // Atomically load state at the time of adding the waiter, to avoid
-    // missed or duplicate events in the caller
-    ARROW_CHECK_EQ(waiter_, nullptr)
-        << "Only one Waiter allowed per Future at any given time";
-    waiter_ = w;
-    waiter_arg_ = future_num;
-    return state_.load();
-  }
-
-  void DoRemoveWaiter(FutureWaiter* w) {
-    std::unique_lock<std::mutex> lock(mutex_);
-
-    ARROW_CHECK_EQ(waiter_, w);
-    waiter_ = nullptr;
-  }
-
   void DoMarkFinished() { DoMarkFinishedOrFailed(FutureState::SUCCESS); }
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
@@ -317,9 +123,6 @@ class ConcreteFutureImpl : public FutureImpl {
     std::vector<CallbackRecord> callbacks;
     std::shared_ptr<FutureImpl> self;
     {
-      // Lock the hypothetical waiter first, and the future after.
-      // This matches the locking order done in FutureWaiter constructor.
-      std::unique_lock<std::mutex> waiter_lock(global_waiter_mutex);
       std::unique_lock<std::mutex> lock(mutex_);
 #ifdef ARROW_WITH_OPENTELEMETRY
       if (this->span_) {
@@ -336,11 +139,11 @@ class ConcreteFutureImpl : public FutureImpl {
       }
 
       state_ = state;
-      if (waiter_ != nullptr) {
-        waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state);
-      }
+      // We need to notify while holding the lock.  This notify often triggers
+      // waiters to delete the future and it is not safe to delete a cv_ while
+      // it is performing a notify_all
+      cv_.notify_all();
     }
-    cv_.notify_all();
     if (callbacks.empty()) return;
 
     // run callbacks, lock not needed since the future is finished by this
@@ -370,8 +173,6 @@ class ConcreteFutureImpl : public FutureImpl {
 
   std::mutex mutex_;
   std::condition_variable cv_;
-  FutureWaiter* waiter_ = nullptr;
-  int waiter_arg_ = -1;
 };
 
 namespace {
@@ -394,14 +195,6 @@ std::unique_ptr<FutureImpl> 
FutureImpl::MakeFinished(FutureState state) {
 
 FutureImpl::FutureImpl() : state_(FutureState::PENDING) {}
 
-FutureState FutureImpl::SetWaiter(FutureWaiter* w, int future_num) {
-  return GetConcreteFuture(this)->DoSetWaiter(w, future_num);
-}
-
-void FutureImpl::RemoveWaiter(FutureWaiter* w) {
-  GetConcreteFuture(this)->DoRemoveWaiter(w);
-}
-
 void FutureImpl::Wait() { GetConcreteFuture(this)->DoWait(); }
 
 bool FutureImpl::Wait(double seconds) { return 
GetConcreteFuture(this)->DoWait(seconds); }
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index 2ac26b7f20..3be4e334b1 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -284,10 +284,6 @@ class ARROW_EXPORT FutureImpl : public 
std::enable_shared_from_this<FutureImpl>
   bool TryAddCallback(const std::function<Callback()>& callback_factory,
                       CallbackOptions opts);
 
-  // Waiter API
-  inline FutureState SetWaiter(FutureWaiter* w, int future_num);
-  inline void RemoveWaiter(FutureWaiter* w);
-
   std::atomic<FutureState> state_{FutureState::PENDING};
 
   // Type erased storage for arbitrary results
@@ -305,63 +301,6 @@ class ARROW_EXPORT FutureImpl : public 
std::enable_shared_from_this<FutureImpl>
 #endif
 };
 
-// An object that waits on multiple futures at once.  Only one waiter
-// can be registered for each future at any time.
-class ARROW_EXPORT FutureWaiter {
- public:
-  enum Kind : int8_t { ANY, ALL, ALL_OR_FIRST_FAILED, ITERATE };
-
-  // HUGE_VAL isn't constexpr on Windows
-  // 
https://social.msdn.microsoft.com/Forums/vstudio/en-US/47e8b9ff-b205-4189-968e-ee3bc3e2719f/constexpr-compile-error?forum=vclanguage
-  static const double kInfinity;
-
-  static std::unique_ptr<FutureWaiter> Make(Kind kind, 
std::vector<FutureImpl*> futures);
-
-  template <typename FutureType>
-  static std::unique_ptr<FutureWaiter> Make(Kind kind,
-                                            const std::vector<FutureType>& 
futures) {
-    return Make(kind, ExtractFutures(futures));
-  }
-
-  virtual ~FutureWaiter();
-
-  bool Wait(double seconds = kInfinity);
-  int WaitAndFetchOne();
-
-  std::vector<int> MoveFinishedFutures();
-
- protected:
-  // Extract FutureImpls from Futures
-  template <typename FutureType,
-            typename Enable = 
std::enable_if<!std::is_pointer<FutureType>::value>>
-  static std::vector<FutureImpl*> ExtractFutures(const 
std::vector<FutureType>& futures) {
-    std::vector<FutureImpl*> base_futures(futures.size());
-    for (int i = 0; i < static_cast<int>(futures.size()); ++i) {
-      base_futures[i] = futures[i].impl_.get();
-    }
-    return base_futures;
-  }
-
-  // Extract FutureImpls from Future pointers
-  template <typename FutureType>
-  static std::vector<FutureImpl*> ExtractFutures(
-      const std::vector<FutureType*>& futures) {
-    std::vector<FutureImpl*> base_futures(futures.size());
-    for (int i = 0; i < static_cast<int>(futures.size()); ++i) {
-      base_futures[i] = futures[i]->impl_.get();
-    }
-    return base_futures;
-  }
-
-  FutureWaiter();
-  ARROW_DISALLOW_COPY_AND_ASSIGN(FutureWaiter);
-
-  inline void MarkFutureFinishedUnlocked(int future_num, FutureState state);
-
-  friend class FutureImpl;
-  friend class ConcreteFutureImpl;
-};
-
 // ---------------------------------------------------------------------
 // Public API
 
@@ -374,8 +313,7 @@ class ARROW_EXPORT FutureWaiter {
 /// status, possibly after running a computation function.
 ///
 /// The consumer API allows querying a Future's current state, wait for it
-/// to complete, or wait on multiple Futures at once (using WaitForAll,
-/// WaitForAny or AsCompletedIterator).
+/// to complete, and composing futures with callbacks.
 template <typename T>
 class ARROW_MUST_USE_TYPE Future {
  public:
@@ -444,9 +382,7 @@ class ARROW_MUST_USE_TYPE Future {
   /// \brief Wait for the Future to complete
   void Wait() const {
     CheckValid();
-    if (!IsFutureFinished(impl_->state())) {
-      impl_->Wait();
-    }
+    impl_->Wait();
   }
 
   /// \brief Wait for the Future to complete, or for the timeout to expire
@@ -456,9 +392,6 @@ class ARROW_MUST_USE_TYPE Future {
   /// concurrently.
   bool Wait(double seconds) const {
     CheckValid();
-    if (IsFutureFinished(impl_->state())) {
-      return true;
-    }
     return impl_->Wait(seconds);
   }
 
@@ -740,7 +673,6 @@ class ARROW_MUST_USE_TYPE Future {
 
   std::shared_ptr<FutureImpl> impl_;
 
-  friend class FutureWaiter;
   friend struct detail::ContinueFuture;
 
   template <typename U>
@@ -790,28 +722,6 @@ static Future<T> DeferNotOk(Result<Future<T>> 
maybe_future) {
   return std::move(maybe_future).MoveValueUnsafe();
 }
 
-/// \brief Wait for all the futures to end, or for the given timeout to expire.
-///
-/// `true` is returned if all the futures completed before the timeout was 
reached,
-/// `false` otherwise.
-template <typename T>
-inline bool WaitForAll(const std::vector<Future<T>>& futures,
-                       double seconds = FutureWaiter::kInfinity) {
-  auto waiter = FutureWaiter::Make(FutureWaiter::ALL, futures);
-  return waiter->Wait(seconds);
-}
-
-/// \brief Wait for all the futures to end, or for the given timeout to expire.
-///
-/// `true` is returned if all the futures completed before the timeout was 
reached,
-/// `false` otherwise.
-template <typename T>
-inline bool WaitForAll(const std::vector<Future<T>*>& futures,
-                       double seconds = FutureWaiter::kInfinity) {
-  auto waiter = FutureWaiter::Make(FutureWaiter::ALL, futures);
-  return waiter->Wait(seconds);
-}
-
 /// \brief Create a Future which completes when all of `futures` complete.
 ///
 /// The future's result is a vector of the results of `futures`.
@@ -867,30 +777,6 @@ Future<> AllComplete(const std::vector<Future<>>& futures);
 ARROW_EXPORT
 Future<> AllFinished(const std::vector<Future<>>& futures);
 
-/// \brief Wait for one of the futures to end, or for the given timeout to 
expire.
-///
-/// The indices of all completed futures are returned.  Note that some futures
-/// may not be in the returned set, but still complete concurrently.
-template <typename T>
-inline std::vector<int> WaitForAny(const std::vector<Future<T>>& futures,
-                                   double seconds = FutureWaiter::kInfinity) {
-  auto waiter = FutureWaiter::Make(FutureWaiter::ANY, futures);
-  waiter->Wait(seconds);
-  return waiter->MoveFinishedFutures();
-}
-
-/// \brief Wait for one of the futures to end, or for the given timeout to 
expire.
-///
-/// The indices of all completed futures are returned.  Note that some futures
-/// may not be in the returned set, but still complete concurrently.
-template <typename T>
-inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
-                                   double seconds = FutureWaiter::kInfinity) {
-  auto waiter = FutureWaiter::Make(FutureWaiter::ANY, futures);
-  waiter->Wait(seconds);
-  return waiter->MoveFinishedFutures();
-}
-
 /// @}
 
 struct Continue {
diff --git a/cpp/src/arrow/util/future_iterator.h 
b/cpp/src/arrow/util/future_iterator.h
deleted file mode 100644
index 9837ae853a..0000000000
--- a/cpp/src/arrow/util/future_iterator.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <cassert>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "arrow/util/future.h"
-#include "arrow/util/iterator.h"
-#include "arrow/util/macros.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-/// An iterator that takes a set of futures, and yields their results as
-/// they are completed, in any order.
-template <typename T>
-class AsCompletedIterator {
- public:
-  // Public default constructor creates an empty iterator
-  AsCompletedIterator();
-
-  explicit AsCompletedIterator(std::vector<Future<T>> futures)
-      : futures_(std::move(futures)),
-        waiter_(FutureWaiter::Make(FutureWaiter::ITERATE, futures_)) {}
-
-  ARROW_DEFAULT_MOVE_AND_ASSIGN(AsCompletedIterator);
-  ARROW_DISALLOW_COPY_AND_ASSIGN(AsCompletedIterator);
-
-  /// Return the results of the first completed, not-yet-returned Future.
-  ///
-  /// The result can be successful or not, depending on the Future's underlying
-  /// task's result.  Even if a Future returns a failed Result, you can still
-  /// call Next() to get further results.
-  Result<T> Next() {
-    if (n_fetched_ == futures_.size()) {
-      return IterationTraits<T>::End();
-    }
-    auto index = waiter_->WaitAndFetchOne();
-    ++n_fetched_;
-    assert(index >= 0 && static_cast<size_t>(index) < futures_.size());
-    auto& fut = futures_[index];
-    assert(IsFutureFinished(fut.state()));
-    return std::move(fut).result();
-  }
-
- private:
-  size_t n_fetched_ = 0;
-  std::vector<Future<T>> futures_;
-  std::unique_ptr<FutureWaiter> waiter_;
-};
-
-template <typename T>
-Iterator<T> MakeAsCompletedIterator(std::vector<Future<T>> futures) {
-  return Iterator<T>(AsCompletedIterator<T>(std::move(futures)));
-}
-
-}  // namespace arrow
diff --git a/cpp/src/arrow/util/future_test.cc 
b/cpp/src/arrow/util/future_test.cc
index f481018cdc..41b470c643 100644
--- a/cpp/src/arrow/util/future_test.cc
+++ b/cpp/src/arrow/util/future_test.cc
@@ -16,7 +16,6 @@
 // under the License.
 
 #include "arrow/util/future.h"
-#include "arrow/util/future_iterator.h"
 
 #include <algorithm>
 #include <chrono>
@@ -39,6 +38,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/matchers.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
 #include "arrow/util/thread_pool.h"
 
 namespace arrow {
@@ -526,6 +526,23 @@ TEST(FutureStressTest, TryAddCallback) {
   }
 }
 
+TEST(FutureStressTest, DeleteAfterWait) {
+  constexpr int kNumTasks = 100;
+  for (int i = 0; i < kNumTasks; i++) {
+    {
+      std::unique_ptr<Future<>> future =
+          internal::make_unique<Future<>>(Future<>::Make());
+      std::thread t([&]() {
+        SleepABit();
+        future->MarkFinished();
+      });
+      ASSERT_TRUE(future->Wait(arrow::kDefaultAssertFinishesWaitSeconds));
+      future.reset();
+      t.join();
+    }
+  }
+}
+
 TEST(FutureCompletionTest, Void) {
   {
     // Simple callback
@@ -1486,204 +1503,6 @@ class FutureTestBase : public ::testing::Test {
     AssertAllSuccessful();
   }
 
-  void TestBasicWaitForAny() {
-    MakeExecutor(4, {{1, true}, {2, false}});
-    auto& futures = executor_->futures();
-
-    std::vector<Future<T>*> wait_on = {&futures[0], &futures[1]};
-    auto finished = WaitForAny(wait_on);
-    ASSERT_THAT(finished, testing::ElementsAre(1));
-
-    wait_on = {&futures[1], &futures[2], &futures[3]};
-    while (finished.size() < 2) {
-      finished = WaitForAny(wait_on);
-    }
-    ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1));
-
-    executor_->SetFinished(3);
-    finished = WaitForAny(futures);
-    ASSERT_THAT(finished, testing::UnorderedElementsAre(1, 2, 3));
-
-    executor_->SetFinishedDeferred(0);
-    // Busy wait until the state change is done
-    while (finished.size() < 4) {
-      finished = WaitForAny(futures);
-    }
-    ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1, 2, 3));
-  }
-
-  void TestTimedWaitForAny() {
-    MakeExecutor(4, {{1, true}, {2, false}});
-    auto& futures = executor_->futures();
-
-    std::vector<int> finished;
-    std::vector<Future<T>*> wait_on = {&futures[0], &futures[3]};
-    finished = WaitForAny(wait_on, kTinyWait);
-    ASSERT_EQ(finished.size(), 0);
-
-    executor_->SetFinished(3);
-    finished = WaitForAny(wait_on, kLargeWait);
-    ASSERT_THAT(finished, testing::ElementsAre(1));
-
-    executor_->SetFinished(0);
-    while (finished.size() < 2) {
-      finished = WaitForAny(wait_on, kTinyWait);
-    }
-    ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1));
-
-    while (finished.size() < 4) {
-      finished = WaitForAny(futures, kTinyWait);
-    }
-    ASSERT_THAT(finished, testing::UnorderedElementsAre(0, 1, 2, 3));
-  }
-
-  void TestBasicWaitForAll() {
-    MakeExecutor(4, {{1, true}, {2, false}});
-    auto& futures = executor_->futures();
-
-    std::vector<Future<T>*> wait_on = {&futures[1], &futures[2]};
-    WaitForAll(wait_on);
-    AssertSpanSuccessfulNow(1, 3);
-
-    executor_->SetFinishedDeferred({{0, true}, {3, false}});
-    WaitForAll(futures);
-    AssertAllSuccessfulNow();
-    WaitForAll(futures);
-  }
-
-  void TestTimedWaitForAll() {
-    MakeExecutor(4, {{1, true}, {2, false}});
-    auto& futures = executor_->futures();
-
-    ASSERT_FALSE(WaitForAll(futures, kTinyWait));
-
-    executor_->SetFinishedDeferred({{0, true}, {3, false}});
-    ASSERT_TRUE(WaitForAll(futures, kLargeWait));
-    AssertAllSuccessfulNow();
-  }
-
-  void TestStressWaitForAny() {
-#ifdef ARROW_VALGRIND
-    const int N = 5;
-#else
-    const int N = 300;
-#endif
-    MakeExecutor(N);
-    const auto& futures = executor_->futures();
-    const auto spans = RandomSequenceSpans(N);
-    std::vector<int> finished;
-    // Note this loop is potentially O(N**2), because we're copying
-    // O(N)-sized vector when waiting.
-    for (const auto& span : spans) {
-      int start = span.first, stop = span.second;
-      executor_->SetFinishedDeferred(start, stop);
-      size_t last_finished_size = finished.size();
-      finished = WaitForAny(futures);
-      ASSERT_GE(finished.size(), last_finished_size);
-      // The spans are contiguous and ordered, so `stop` is also the number
-      // of futures for which SetFinishedDeferred() was called.
-      ASSERT_LE(finished.size(), static_cast<size_t>(stop));
-    }
-    // Semi-busy wait for all futures to be finished
-    while (finished.size() < static_cast<size_t>(N)) {
-      finished = WaitForAny(futures);
-    }
-    AssertAllSuccessfulNow();
-  }
-
-  void TestStressWaitForAll() {
-#ifdef ARROW_VALGRIND
-    const int N = 5;
-#else
-    const int N = 300;
-#endif
-    MakeExecutor(N);
-    const auto& futures = executor_->futures();
-    const auto spans = RandomSequenceSpans(N);
-    // Note this loop is potentially O(N**2), because we're copying
-    // O(N)-sized vector when waiting.
-    for (const auto& span : spans) {
-      int start = span.first, stop = span.second;
-      executor_->SetFinishedDeferred(start, stop);
-      bool finished = WaitForAll(futures, kTinyWait);
-      if (stop < N) {
-        ASSERT_FALSE(finished);
-      }
-    }
-    ASSERT_TRUE(WaitForAll(futures, kLargeWait));
-    AssertAllSuccessfulNow();
-  }
-
-  void TestBasicAsCompleted() {
-    {
-      MakeExecutor(4, {{1, true}, {2, true}});
-      executor_->SetFinishedDeferred({{0, true}, {3, true}});
-      auto it = MakeAsCompletedIterator(executor_->futures());
-      std::vector<T> values = IteratorToVector(std::move(it));
-      ASSERT_THAT(values, testing::UnorderedElementsAre(0, 1, 2, 3));
-    }
-    {
-      // Check that AsCompleted is opportunistic, it yields elements in order
-      // of completion.
-      MakeExecutor(4, {{2, true}});
-      auto it = MakeAsCompletedIterator(executor_->futures());
-      ASSERT_OK_AND_EQ(2, it.Next());
-      executor_->SetFinishedDeferred({{3, true}});
-      ASSERT_OK_AND_EQ(3, it.Next());
-      executor_->SetFinishedDeferred({{0, true}});
-      ASSERT_OK_AND_EQ(0, it.Next());
-      executor_->SetFinishedDeferred({{1, true}});
-      ASSERT_OK_AND_EQ(1, it.Next());
-      ASSERT_OK_AND_EQ(IterationTraits<T>::End(), it.Next());
-      ASSERT_OK_AND_EQ(IterationTraits<T>::End(), it.Next());  // idempotent
-    }
-  }
-
-  void TestErrorsAsCompleted() {
-    MakeExecutor(4, {{1, true}, {2, false}});
-    executor_->SetFinishedDeferred({{0, true}, {3, false}});
-    auto it = MakeAsCompletedIterator(executor_->futures());
-    auto results = IteratorToResults(std::move(it));
-    ASSERT_THAT(results.values, testing::UnorderedElementsAre(0, 1));
-    ASSERT_EQ(results.errors.size(), 2);
-    ASSERT_RAISES(UnknownError, results.errors[0]);
-    ASSERT_RAISES(UnknownError, results.errors[1]);
-  }
-
-  void TestStressAsCompleted() {
-#ifdef ARROW_VALGRIND
-    const int N = 10;
-#else
-    const int N = 1000;
-#endif
-    MakeExecutor(N);
-
-    // Launch a worker thread that will finish random spans of futures,
-    // in random order.
-    auto spans = RandomSequenceSpans(N);
-    RandomShuffle(&spans);
-    auto feed_iterator = [&]() {
-      for (const auto& span : spans) {
-        int start = span.first, stop = span.second;
-        executor_->SetFinishedDeferred(start, stop);  // will sleep a bit
-      }
-    };
-    auto worker = std::thread(std::move(feed_iterator));
-    auto it = MakeAsCompletedIterator(executor_->futures());
-    auto results = IteratorToResults(std::move(it));
-    worker.join();
-
-    ASSERT_EQ(results.values.size(), static_cast<size_t>(N));
-    ASSERT_EQ(results.errors.size(), 0);
-    std::vector<int> expected(N);
-    std::iota(expected.begin(), expected.end(), 0);
-    std::vector<int> actual(N);
-    std::transform(results.values.begin(), results.values.end(), 
actual.begin(),
-                   [](const T& value) { return value.ToInt(); });
-    std::sort(actual.begin(), actual.end());
-    ASSERT_EQ(expected, actual);
-  }
-
  protected:
   std::unique_ptr<ExecutorType> executor_;
   int seed_ = 42;
@@ -1702,31 +1521,6 @@ TYPED_TEST(FutureWaitTest, TimedWait) { 
this->TestTimedWait(); }
 
 TYPED_TEST(FutureWaitTest, StressWait) { this->TestStressWait(); }
 
-TYPED_TEST(FutureWaitTest, BasicWaitForAny) { this->TestBasicWaitForAny(); }
-
-TYPED_TEST(FutureWaitTest, TimedWaitForAny) { this->TestTimedWaitForAny(); }
-
-TYPED_TEST(FutureWaitTest, StressWaitForAny) { this->TestStressWaitForAny(); }
-
-TYPED_TEST(FutureWaitTest, BasicWaitForAll) { this->TestBasicWaitForAll(); }
-
-TYPED_TEST(FutureWaitTest, TimedWaitForAll) { this->TestTimedWaitForAll(); }
-
-TYPED_TEST(FutureWaitTest, StressWaitForAll) { this->TestStressWaitForAll(); }
-
-template <typename T>
-class FutureIteratorTest : public FutureTestBase<T> {};
-
-using FutureIteratorTestTypes = ::testing::Types<Foo>;
-
-TYPED_TEST_SUITE(FutureIteratorTest, FutureIteratorTestTypes);
-
-TYPED_TEST(FutureIteratorTest, BasicAsCompleted) { 
this->TestBasicAsCompleted(); }
-
-TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { 
this->TestErrorsAsCompleted(); }
-
-TYPED_TEST(FutureIteratorTest, StressAsCompleted) { 
this->TestStressAsCompleted(); }
-
 namespace internal {
 TEST(FnOnceTest, MoveOnlyDataType) {
   // ensuring this is valid guarantees we are making no unnecessary copies
diff --git a/cpp/src/arrow/util/thread_pool_test.cc 
b/cpp/src/arrow/util/thread_pool_test.cc
index b61e964a19..7980965171 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -771,7 +771,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
     // Fork after task submission
     auto pool = this->MakeThreadPool(3);
     ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
-    ASSERT_OK_AND_EQ(9, fut.result());
+    ASSERT_FINISHES_OK_AND_EQ(9, fut);
 
     auto child_pid = fork();
     if (child_pid == 0) {

Reply via email to