westonpace commented on code in PR #14524:
URL: https://github.com/apache/arrow/pull/14524#discussion_r1018410231


##########
cpp/src/arrow/util/async_util.h:
##########
@@ -145,206 +188,216 @@ class ARROW_EXPORT AsyncTaskScheduler {
     /// acquired and the caller can proceed.  If a future is returned then the 
caller
     /// should wait for the future to complete first.  When the returned 
future completes
     /// the permits have NOT been acquired and the caller must call Acquire 
again
+    ///
+    /// \param amt the number of permits to acquire
     virtual std::optional<Future<>> TryAcquire(int amt) = 0;
     /// Release amt permits
     ///
     /// This will possibly complete waiting futures and should probably not be
     /// called while holding locks.
+    ///
+    /// \param amt the number of permits to release
     virtual void Release(int amt) = 0;
 
     /// The size of the largest task that can run
     ///
     /// Incoming tasks will have their cost latched to this value to ensure
-    /// they can still run (although they will generally be the only thing 
allowed to
+    /// they can still run (although they will be the only thing allowed to
     /// run at that time).
     virtual int Capacity() = 0;
+
+    /// Pause the throttle
+    ///
+    /// Any tasks that have been submitted already will continue.  However, no 
new tasks
+    /// will be run until the throttle is resumed.
+    virtual void Pause() = 0;
+    /// Resume the throttle
+    ///
+    /// Allows taks to be submitted again.  If there is a max_concurrent_cost 
limit then
+    /// it will still apply.
+    virtual void Resume() = 0;
   };
-  /// Create a throttle
+
+  /// Pause the throttle
+  ///
+  /// Any tasks that have been submitted already will continue.  However, no 
new tasks
+  /// will be run until the throttle is resumed.
+  virtual void Pause() = 0;
+  /// Resume the throttle
   ///
-  /// This throttle is used to limit how many tasks can run at once.  The
-  /// user should keep the throttle alive for the lifetime of the scheduler.
-  /// The same throttle can be used in multiple schedulers.
-  static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+  /// Allows taks to be submitted again.  If there is a max_concurrent_cost 
limit then
+  /// it will still apply.
+  virtual void Resume() = 0;
 
-  /// Add a task to the scheduler
+  /// Create a throttled view of a scheduler
   ///
-  /// If the scheduler is in an aborted state this call will return false and 
the task
-  /// will never be run.  This is harmless and does not need to be guarded 
against.
+  /// Tasks added via this view will be subjected to the throttle and, if the 
tasks cannot
+  /// run immediately, will be placed into a queue.
   ///
-  /// If the scheduler is in an ended state then this call will cause an 
program abort.
-  /// This represents a logic error in the program and should be avoidable.
+  /// Using a throttled view after the underlying scheduler has finished is 
invalid.
   ///
-  /// If there are no limits on the number of concurrent tasks then the submit 
function
-  /// will be run immediately.
+  /// Although a shared_ptr is returned it should generally be assumed that 
the caller
+  /// is being given exclusive ownership.  The shared_ptr is used to share the 
view with
+  /// queued and submitted tasks and the lifetime of those is unpredictable.  
It is
+  /// important the caller keep the returned pointer alive for as long as they 
plan to add
+  /// tasks to the view.
   ///
-  /// Otherwise, if there is a throttle, and it is full, then this task will 
be inserted
-  /// into the scheduler's queue and submitted when there is space.
+  /// \param scheduler a scheduler to submit tasks to after throttling
   ///
-  /// The return value for this call can usually be ignored.  There is little 
harm in
-  /// attempting to add tasks to an aborted scheduler.  It is only included 
for callers
-  /// that want to avoid future task generation.
+  /// This can be the root scheduler, another throttled scheduler, or a task 
group.  These
+  /// are all composable.
   ///
-  /// \return true if the task was submitted or queued, false if the task was 
ignored
-  virtual bool AddTask(std::unique_ptr<Task> task) = 0;
+  /// \param max_concurrent_cost the maximum amount of cost allowed to run at 
any one time
+  ///
+  /// If a task is added that has a cost greater than max_concurrent_cost then 
its cost
+  /// will be reduced to max_concurrent_cost so that it is still possible for 
the task to
+  /// run.
+  ///
+  /// \param queue the queue to use when tasks cannot be submitted
+  ///
+  /// By default a FIFO queue will be used.  However, a custom queue can be 
provided if
+  /// some tasks have higher priority than other tasks.
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> Make(
+      AsyncTaskScheduler* scheduler, int max_concurrent_cost,
+      std::unique_ptr<Queue> queue = NULLPTR);
 
-  /// Adds an async generator to the scheduler
+  /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle
   ///
-  /// The async generator will be visited, one item at a time.  Submitting a 
task
-  /// will consist of polling the generator for the next future.  The 
generator's future
-  /// will then represent the task itself.
+  /// \see Make
+  static std::shared_ptr<ThrottledAsyncTaskScheduler> MakeWithCustomThrottle(
+      AsyncTaskScheduler* scheduler, std::unique_ptr<Throttle> throttle,
+      std::unique_ptr<Queue> queue = NULLPTR);
+};
+
+/// A utility to keep track of a collection of tasks
+///
+/// Often it is useful to keep track of some state that only needs to stay 
alive
+/// for some small collection of tasks, or to perform some kind of final 
cleanup
+/// when a collection of tasks is finished.
+///
+/// For example, when scanning, we need to keep the file reader alive while 
all scan
+/// tasks run for a given file, and then we can gracefully close it when we 
finish the
+/// file.
+class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler {
+ public:
+  /// Destructor for the task group
   ///
-  /// This visits the task serially without readahead.  If readahead or 
parallelism
-  /// is desired then it should be added in the generator itself.
+  /// The finish callback will not run until the task group is destroyed and 
all
+  /// tasks are finished so you will generally want to eagerly call this at 
some point

Review Comment:
   Yes, good point.  This comment made more sense on an older iteration of the 
code.  I moved this paragraph to the `Make` call as that is the point where the 
unique_ptr is returned and made it clearer that the user should eagerly 
reset/destroy the unique_ptr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to