lidavidm commented on code in PR #13912:
URL: https://github.com/apache/arrow/pull/13912#discussion_r950404825


##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -19,186 +19,334 @@
 
 #include "arrow/util/future.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+
+#include <deque>
+#include <iostream>
+#include <list>
+#include <mutex>
 
 namespace arrow {
-namespace util {
 
-AsyncDestroyable::AsyncDestroyable() : on_closed_(Future<>::Make()) {}
+using internal::make_unique;
 
-#ifndef NDEBUG
-AsyncDestroyable::~AsyncDestroyable() {
-  DCHECK(constructed_correctly_) << "An instance of AsyncDestroyable must be 
created by "
-                                    "MakeSharedAsync or MakeUniqueAsync";
-}
-#else
-AsyncDestroyable::~AsyncDestroyable() = default;
-#endif
-
-void AsyncDestroyable::Destroy() {
-  DoDestroy().AddCallback([this](const Status& st) {
-    on_closed_.MarkFinished(st);
-    delete this;
-  });
-}
+namespace util {
 
-Status AsyncTaskGroup::AddTask(std::function<Result<Future<>>()> task) {
-  auto guard = mutex_.Lock();
-  if (finished_adding_) {
-    return Status::Cancelled("Ignoring task added after the task group has 
been ended");
-  }
-  if (!err_.ok()) {
-    return err_;
-  }
-  Result<Future<>> maybe_task_fut = task();
-  if (!maybe_task_fut.ok()) {
-    err_ = maybe_task_fut.status();
-    return err_;
-  }
-  return AddTaskUnlocked(*maybe_task_fut, std::move(guard));
-}
+class ThrottleImpl : public AsyncTaskScheduler::Throttle {
+ public:
+  ThrottleImpl(int max_concurrent_cost) : available_cost_(max_concurrent_cost) 
{}
 
-Result<bool> 
AsyncTaskGroup::AddTaskIfNotEnded(std::function<Result<Future<>>()> task) {
-  auto guard = mutex_.Lock();
-  if (finished_adding_) {
-    return false;
-  }
-  if (!err_.ok()) {
-    return err_;
-  }
-  Result<Future<>> maybe_task_fut = task();
-  if (!maybe_task_fut.ok()) {
-    err_ = maybe_task_fut.status();
-    return err_;
+  util::optional<Future<>> TryAcquire(int amt) override {
+    std::lock_guard<std::mutex> lk(mutex_);
+    if (backoff_.is_valid()) {
+      return backoff_;
+    }
+    if (amt <= available_cost_) {
+      available_cost_ -= amt;
+      return nullopt;
+    }
+    backoff_ = Future<>::Make();
+    return backoff_;
   }
-  ARROW_RETURN_NOT_OK(AddTaskUnlocked(*maybe_task_fut, std::move(guard)));
-  return true;
-}
 
-Status AsyncTaskGroup::AddTaskUnlocked(const Future<>& task_fut,
-                                       util::Mutex::Guard guard) {
-  // If the task is already finished there is nothing to track so lets save
-  // some work and return early
-  if (task_fut.is_finished()) {
-    err_ &= task_fut.status();
-    return err_;
+  void Release(int amt) override {
+    Future<> backoff_to_fulfill;
+    {
+      std::lock_guard<std::mutex> lk(mutex_);
+      available_cost_ += amt;
+      if (backoff_.is_valid()) {
+        backoff_to_fulfill = std::move(backoff_);
+      }
+    }
+    if (backoff_to_fulfill.is_valid()) {
+      backoff_to_fulfill.MarkFinished();
+    }
   }
-  running_tasks_++;
-  guard.Unlock();
-  task_fut.AddCallback([this](const Status& st) {
-    auto guard = mutex_.Lock();
-    err_ &= st;
-    if (--running_tasks_ == 0 && finished_adding_) {
-      guard.Unlock();
-      all_tasks_done_.MarkFinished(err_);
-    }
-  });
-  return Status::OK();
+
+ private:
+  std::mutex mutex_;
+  int available_cost_;
+  Future<> backoff_;
+};
+
+std::unique_ptr<AsyncTaskScheduler::Throttle> AsyncTaskScheduler::MakeThrottle(
+    int max_concurrent_cost) {
+  return make_unique<ThrottleImpl>(max_concurrent_cost);
 }
 
-Status AsyncTaskGroup::AddTask(const Future<>& task_fut) {
-  auto guard = mutex_.Lock();
-  if (finished_adding_) {
-    return Status::Cancelled("Ignoring task added after the task group has 
been ended");
-  }
-  if (!err_.ok()) {
-    return err_;
+namespace {
+
+// Very basic FIFO queue
+class FifoQueue : public AsyncTaskScheduler::Queue {
+  using Task = AsyncTaskScheduler::Task;
+  void Push(std::unique_ptr<Task> task) override { 
tasks_.push_back(std::move(task)); }
+
+  std::unique_ptr<Task> Pop() override {
+    std::unique_ptr<Task> task = std::move(tasks_.front());
+    tasks_.pop_front();
+    return task;
   }
-  return AddTaskUnlocked(task_fut, std::move(guard));
-}
 
-Result<bool> AsyncTaskGroup::AddTaskIfNotEnded(const Future<>& task_fut) {
-  auto guard = mutex_.Lock();
-  if (finished_adding_) {
-    return false;
+  const Task& Peek() override { return *tasks_.front(); }
+
+  bool Empty() override { return tasks_.empty(); }
+
+  void Purge() override { tasks_.clear(); }
+
+ private:
+  std::list<std::unique_ptr<Task>> tasks_;

Review Comment:
   nit: use a `std::deque` instead of a linked list?



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -147,70 +207,42 @@ class ARROW_EXPORT AsyncTaskGroup {
   ///
   /// This is a utility method for workflows where the finish future needs to 
be
   /// referenced before all top level tasks have been queued.
-  Future<> OnFinished() const;
+  virtual Future<> OnFinished() const = 0;
 
- private:
-  Status AddTaskUnlocked(const Future<>& task, util::Mutex::Guard guard);
-
-  bool finished_adding_ = false;
-  int running_tasks_ = 0;
-  Status err_;
-  Future<> all_tasks_done_ = Future<>::Make();
-  util::Mutex mutex_;
-};
-
-/// A task group which serializes asynchronous tasks in a push-based workflow
-///
-/// Tasks will be executed in the order they are added
-///
-/// This will buffer results in an unlimited fashion so it should be combined
-/// with some kind of backpressure
-class ARROW_EXPORT SerializedAsyncTaskGroup {
- public:
-  SerializedAsyncTaskGroup();
-  /// Push an item into the serializer and (eventually) into the consumer
+  /// Create a sub-scheduler for tracking a subset of tasks
   ///
-  /// The item will not be delivered to the consumer until all previous items 
have been
-  /// consumed.
+  /// The parent scheduler will manage the lifetime of the sub-scheduler.  It 
will
+  /// be destroyed once it is finished.
   ///
-  /// If the consumer returns an error then this serializer will go into an 
error state
-  /// and all subsequent pushes will fail with that error.  Pushes that have 
been queued
-  /// but not delivered will be silently dropped.
+  /// Often some state needs to be associated with a subset of tasks.
+  /// For example, when scanning a dataset we need to keep a file reader
+  /// alive for all of the read tasks for each file. A sub-scheduler can be 
used to do
+  /// this.
   ///
-  /// \return True if the item was pushed immediately to the consumer, false 
if it was
-  /// queued
-  Status AddTask(std::function<Result<Future<>>()> task);
-
-  /// Signal that all top level tasks have been added
+  /// The parent scheduler may be ended before all of its sub-schedulers
+  /// are ended.
   ///
-  /// The returned future that will finish when all tasks have been consumed.
-  Future<> End();
-
-  /// Abort a task group
+  /// If either the parent scheduler or the sub-scheduler encounter an error
+  /// then they will both enter an aborted state (this is a shared state).
+  /// Finish callbacks will not be run when the scheduler is aborted.
   ///
-  /// Tasks that have not been started will be discarded
+  /// The parent scheduler will not complete until the sub-scheduler's
+  /// tasks (and finish callback) have all executed.
   ///
-  /// The returned future will finish when all running tasks have finished.
-  Future<> Abort(Status err);
+  /// A sub-scheduler can share the same throttle as its parent but it
+  /// can also have its own unique throttle.

Review Comment:
   This means a sub-scheduler can schedule more tasks even if the parent 
wouldn't normally be able to?



##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -488,13 +504,15 @@ class TeeNode : public compute::MapNode {
   const char* kind_name() const override { return "TeeNode"; }
 
   void Finish(Status finish_st) override {
-    dataset_writer_->Finish().AddCallback([this, finish_st](const Status& 
dw_status) {
-      // Need to wait for the task group to complete regardless of dw_status
-      task_group_.End().AddCallback(
-          [this, dw_status, finish_st](const Status& tg_status) {
-            finished_.MarkFinished(dw_status & finish_st & tg_status);
-          });
-    });
+    if (!finish_st.ok()) {
+      MapNode::Finish(std::move(finish_st));
+    }
+    Status writer_finish_st = dataset_writer_->Finish();
+    if (!writer_finish_st.ok()) {
+      MapNode::Finish(std::move(writer_finish_st));
+    }
+    serial_scheduler_->End();
+    MapNode::Finish(Status::OK());

Review Comment:
   Doesn't this potentially call MapNode::Finish multiple times?



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -17,128 +17,188 @@
 
 #pragma once
 
-#include <queue>
-
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/functional.h"
 #include "arrow/util/future.h"
+#include "arrow/util/make_unique.h"
 #include "arrow/util/mutex.h"
 
 namespace arrow {
-namespace util {
 
-/// Custom deleter for AsyncDestroyable objects
-template <typename T>
-struct DestroyingDeleter {
-  void operator()(T* p) {
-    if (p) {
-      p->Destroy();
-    }
-  }
-};
+using internal::FnOnce;
+using internal::make_unique;
 
-/// An object which should be asynchronously closed before it is destroyed
+namespace util {
+
+/// A utility which keeps tracks of, and schedules, asynchronous tasks
 ///
-/// Classes can extend this to ensure that the close method is called and 
completed
-/// before the instance is deleted.  This provides smart_ptr / delete 
semantics for
-/// objects with an asynchronous destructor.
+/// An asynchronous task has a synchronous component and an asynchronous 
component.
+/// The synchronous component typically schedules some kind of work on an 
external
+/// resource (e.g. the I/O thread pool or some kind of kernel-based 
asynchronous
+/// resource like io_uring).  The asynchronous part represents the work
+/// done on that external resource.  Executing the synchronous part will be 
referred
+/// to as "submitting the task" since this usually includes submitting the 
asynchronous
+/// portion to the external thread pool.
 ///
-/// Classes which extend this must be constructed using MakeSharedAsync or 
MakeUniqueAsync
-class ARROW_EXPORT AsyncDestroyable {
- public:
-  AsyncDestroyable();
-  virtual ~AsyncDestroyable();
-
-  /// A future which will complete when the AsyncDestroyable has finished and 
is ready
-  /// to be deleted.
-  ///
-  /// This can be used to ensure all work done by this object has been 
completed before
-  /// proceeding.
-  Future<> on_closed() { return on_closed_; }
-
- protected:
-  /// Subclasses should override this and perform any cleanup.  Once the 
future returned
-  /// by this method finishes then this object is eligible for destruction and 
any
-  /// reference to `this` will be invalid
-  virtual Future<> DoDestroy() = 0;
-
- private:
-  void Destroy();
-
-  Future<> on_closed_;
-#ifndef NDEBUG
-  bool constructed_correctly_ = false;
-#endif
-
-  template <typename T>
-  friend struct DestroyingDeleter;
-  template <typename T, typename... Args>
-  friend std::shared_ptr<T> MakeSharedAsync(Args&&... args);
-  template <typename T, typename... Args>
-  friend std::unique_ptr<T, DestroyingDeleter<T>> MakeUniqueAsync(Args&&... 
args);
-};
-
-template <typename T, typename... Args>
-std::shared_ptr<T> MakeSharedAsync(Args&&... args) {
-  static_assert(std::is_base_of<AsyncDestroyable, T>::value,
-                "Nursery::MakeSharedCloseable only works with AsyncDestroyable 
types");
-  std::shared_ptr<T> ptr(new T(std::forward<Args&&>(args)...), 
DestroyingDeleter<T>());
-#ifndef NDEBUG
-  ptr->constructed_correctly_ = true;
-#endif
-  return ptr;
-}
-
-template <typename T, typename... Args>
-std::unique_ptr<T, DestroyingDeleter<T>> MakeUniqueAsync(Args&&... args) {
-  static_assert(std::is_base_of<AsyncDestroyable, T>::value,
-                "Nursery::MakeUniqueCloseable only works with AsyncDestroyable 
types");
-  std::unique_ptr<T, DestroyingDeleter<T>> ptr(new 
T(std::forward<Args>(args)...),
-                                               DestroyingDeleter<T>());
-#ifndef NDEBUG
-  ptr->constructed_correctly_ = true;
-#endif
-  return ptr;
-}
-
-/// A utility which keeps track of a collection of asynchronous tasks
+/// By default the scheduler will submit the task (execute the synchronous 
part) as
+/// soon as it is added, assuming the underlying thread pool hasn't terminated 
or the
+/// scheduler hasn't aborted.  In this mode the scheduler is simply acting as
+/// a task group, keeping track of the ongoing work.
 ///
 /// This can be used to provide structured concurrency for asynchronous 
development.
 /// A task group created at a high level can be distributed amongst low level 
components
 /// which register work to be completed.  The high level job can then wait for 
all work
 /// to be completed before cleaning up.
-class ARROW_EXPORT AsyncTaskGroup {
+///
+/// A task scheduler must eventually be ended when all tasks have been added.  
Once the
+/// scheduler has been ended it is an error to add further tasks.  Note, it is 
not an
+/// error to add additional tasks after a scheduler has aborted (though these 
tasks
+/// will be ignored and never submitted).  The scheduler has a futuer which 
will complete
+/// once the scheduler has been ended AND all remaining tasks have finished 
executing.
+/// Ending a scheduler will NOT cause the scheduler to flush existing tasks.
+///
+/// Task failure (either the synchronous portion or the asynchronous portion) 
will cause
+/// the scheduler to enter an aborted state.  The first such failure will be 
reported in
+/// the final task future.
+///
+/// The scheduler can also be manually aborted.  A cancellation status will be 
reported as
+/// the final task future.
+///
+/// It is also possible to limit the number of concurrent tasks the scheduler 
will
+/// execute. This is done by setting a task limit.  The task limit initially 
assumes all
+/// tasks are equal but a custom cost can be supplied when scheduling a task 
(e.g. based
+/// on the total I/O cost of the task, or the expected RAM utilization of the 
task)
+///
+/// When the total number of running tasks is limited then scheduler priority 
may also
+/// become a consideration.  By default the scheduler runs with a FIFO queue 
but a custom
+/// task queue can be provided.  One could, for example, use a priority queue 
to control
+/// the order in which tasks are executed.
+///
+/// It is common to have multiple stages of execution.  For example, when 
scanning, we
+/// first inspect each fragment (the inspect stage) to figure out the row 
groups and then
+/// we scan row groups (the scan stage) to read in the data.  This sort of 
multi-stage
+/// execution should be represented as two seperate task groups.  The first 
task group can
+/// then have a custom finish callback which ends the second task group.
+class ARROW_EXPORT AsyncTaskScheduler {
  public:
-  /// Add a task to be tracked by this task group
-  ///
-  /// If a previous task has failed then adding a task will fail
-  ///
-  /// If WaitForTasksToFinish has been called and the returned future has been 
marked
-  /// completed then adding a task will fail.
-  Status AddTask(std::function<Result<Future<>>()> task);
-  /// Same as AddTask but doesn't add the task if End() has been called.
-  ///
-  /// \return true if the task was started, false if the group had already 
ended
-  Result<bool> AddTaskIfNotEnded(std::function<Result<Future<>>()> task);
-  /// Add a task that has already been started
-  Status AddTask(const Future<>& task);
-  /// \brief Attempt to add a task that has already been started to this 
group's tracking
-  ///
-  /// The return value must be paid attention to.  If the return value is 
false then the
-  /// task could not be added because the group had already ended and so the 
caller must
-  /// track the external task some other way.
-  Result<bool> AddTaskIfNotEnded(const Future<>& task);
-  /// Signal that top level tasks are done being added
+  /// Destructor for AsyncTaskScheduler
+  ///
+  /// If a scheduler is not in the ended state when it is destroyed then it
+  /// will enter an aborted state.
+  ///
+  /// The destructor will block until all submitted tasks have finished.
+  virtual ~AsyncTaskScheduler() = default;
+  /// An interface for a task
+  ///
+  /// Users may want to override this, for example, to add priority
+  /// information for use by a queue.
+  class Task {
+   public:
+    virtual ~Task() = default;
+    /// Submit the task
+    ///
+    /// This will be called by the scheduler at most once when there
+    /// is space to run the task.  This is expected to be a fairly quick
+    /// function that simply submits the actual task work to an external
+    /// resource (e.g. I/O thread pool).
+    ///
+    /// If this call fails then the scheduler will enter an aborted state.
+    virtual Result<Future<>> operator()(AsyncTaskScheduler* scheduler) = 0;
+    /// The cost of the task
+    ///
+    /// The scheduler limits the total number of concurrent tasks based
+    /// on cost.  A custom cost may be used, for example, if you would like
+    /// to limit the number of tasks based on the total expected RAM usage of
+    /// the tasks (this is done in the scanner)
+    virtual int cost() const { return 1; }
+  };
+
+  /// An interface for a task queue
+  ///
+  /// A queue's methods will not be called concurrently
+  class Queue {
+   public:
+    virtual ~Queue() = default;
+    /// Push a task to the queue
+    virtual void Push(std::unique_ptr<Task> task) = 0;
+    /// Pop the next task from the queue
+    virtual std::unique_ptr<Task> Pop() = 0;
+    /// Peek the next task in the queue
+    virtual const Task& Peek() = 0;
+    /// Check if the queue is empty
+    virtual bool Empty() = 0;
+    /// Purge the queue of all items
+    virtual void Purge() = 0;
+  };
+
+  class Throttle {
+   public:
+    virtual ~Throttle() = default;
+    /// Acquire amt permits
+    ///
+    /// If nullopt is returned then the permits were immediately
+    /// acquired and the caller can proceed.  If a future is returned then the 
caller
+    /// should wait for the future to complete first.  When the returned 
future completes
+    /// the permits have NOT been acquired and the caller must call Acquire 
again
+    virtual util::optional<Future<>> TryAcquire(int amt) = 0;
+    /// Release amt permits
+    ///
+    /// This will possibly complete waiting futures and should probably not be
+    /// called while holding locks.
+    virtual void Release(int amt) = 0;
+  };
+  /// Create a throttle
+  ///
+  /// This throttle is used to limit how many tasks can run at once.  The
+  /// user should keep the throttle alive for the lifetime of the scheduler.
+  /// The same throttle can be used in multiple schedulers.
+  static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+
+  /// Add a task to the scheduler
+  ///
+  /// If the scheduler is in an aborted state this call will return false and 
the task
+  /// will never be run.  This is harmless and does not need to be guarded 
against.
+  ///
+  /// If the scheduler is in an ended state then this call will cause an 
abort.  This
+  /// represents a logic error in the program and should be avoidable.

Review Comment:
   I wonder if we should error instead just so that a bug doesn't, say, tear 
down a notebook kernel or crash the R interpreter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to