westonpace commented on code in PR #13912:
URL: https://github.com/apache/arrow/pull/13912#discussion_r950426026
##########
cpp/src/arrow/util/async_util.h:
##########
@@ -147,70 +207,42 @@ class ARROW_EXPORT AsyncTaskGroup {
///
/// This is a utility method for workflows where the finish future needs to
be
/// referenced before all top level tasks have been queued.
- Future<> OnFinished() const;
+ virtual Future<> OnFinished() const = 0;
- private:
- Status AddTaskUnlocked(const Future<>& task, util::Mutex::Guard guard);
-
- bool finished_adding_ = false;
- int running_tasks_ = 0;
- Status err_;
- Future<> all_tasks_done_ = Future<>::Make();
- util::Mutex mutex_;
-};
-
-/// A task group which serializes asynchronous tasks in a push-based workflow
-///
-/// Tasks will be executed in the order they are added
-///
-/// This will buffer results in an unlimited fashion so it should be combined
-/// with some kind of backpressure
-class ARROW_EXPORT SerializedAsyncTaskGroup {
- public:
- SerializedAsyncTaskGroup();
- /// Push an item into the serializer and (eventually) into the consumer
+ /// Create a sub-scheduler for tracking a subset of tasks
///
- /// The item will not be delivered to the consumer until all previous items
have been
- /// consumed.
+ /// The parent scheduler will manage the lifetime of the sub-scheduler. It
will
+ /// be destroyed once it is finished.
///
- /// If the consumer returns an error then this serializer will go into an
error state
- /// and all subsequent pushes will fail with that error. Pushes that have
been queued
- /// but not delivered will be silently dropped.
+ /// Often some state needs to be associated with a subset of tasks.
+ /// For example, when scanning a dataset we need to keep a file reader
+ /// alive for all of the read tasks for each file. A sub-scheduler can be
used to do
+ /// this.
///
- /// \return True if the item was pushed immediately to the consumer, false
if it was
- /// queued
- Status AddTask(std::function<Result<Future<>>()> task);
-
- /// Signal that all top level tasks have been added
+ /// The parent scheduler may be ended before all of its sub-schedulers
+ /// are ended.
///
- /// The returned future that will finish when all tasks have been consumed.
- Future<> End();
-
- /// Abort a task group
+ /// If either the parent scheduler or the sub-scheduler encounter an error
+ /// then they will both enter an aborted state (this is a shared state).
+ /// Finish callbacks will not be run when the scheduler is aborted.
///
- /// Tasks that have not been started will be discarded
+ /// The parent scheduler will not complete until the sub-scheduler's
+ /// tasks (and finish callback) have all executed.
///
- /// The returned future will finish when all running tasks have finished.
- Future<> Abort(Status err);
+ /// A sub-scheduler can share the same throttle as its parent but it
+ /// can also have its own unique throttle.
Review Comment:
I suppose it does. In practice the real cases I have are doing the
opposite. When writing datasets we create a sub-scheduler per file and that
sub-scheduler is only allowed to run one task at a time to prevent concurrent
writes. In the scanner there is a sub scheduler for each input file. All of
the input file sub-schedulers share a common throttle. There is also a
fragment scanner which has its own independent throttle (maybe, just to control
how many open files there are, but fragment readahead will be less critical I
think). I don't think the fragment scanner is the parent of the file scanners
(I think they will be siblings) but it might be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]