westonpace commented on code in PR #13912:
URL: https://github.com/apache/arrow/pull/13912#discussion_r953206180
##########
cpp/src/arrow/util/async_util.h:
##########
@@ -17,128 +17,188 @@
#pragma once
-#include <queue>
-
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/functional.h"
#include "arrow/util/future.h"
+#include "arrow/util/make_unique.h"
#include "arrow/util/mutex.h"
namespace arrow {
-namespace util {
-/// Custom deleter for AsyncDestroyable objects
-template <typename T>
-struct DestroyingDeleter {
- void operator()(T* p) {
- if (p) {
- p->Destroy();
- }
- }
-};
+using internal::FnOnce;
+using internal::make_unique;
-/// An object which should be asynchronously closed before it is destroyed
+namespace util {
+
+/// A utility which keeps tracks of, and schedules, asynchronous tasks
///
-/// Classes can extend this to ensure that the close method is called and
completed
-/// before the instance is deleted. This provides smart_ptr / delete
semantics for
-/// objects with an asynchronous destructor.
+/// An asynchronous task has a synchronous component and an asynchronous
component.
+/// The synchronous component typically schedules some kind of work on an
external
+/// resource (e.g. the I/O thread pool or some kind of kernel-based
asynchronous
+/// resource like io_uring). The asynchronous part represents the work
+/// done on that external resource. Executing the synchronous part will be
referred
+/// to as "submitting the task" since this usually includes submitting the
asynchronous
+/// portion to the external thread pool.
///
-/// Classes which extend this must be constructed using MakeSharedAsync or
MakeUniqueAsync
-class ARROW_EXPORT AsyncDestroyable {
- public:
- AsyncDestroyable();
- virtual ~AsyncDestroyable();
-
- /// A future which will complete when the AsyncDestroyable has finished and
is ready
- /// to be deleted.
- ///
- /// This can be used to ensure all work done by this object has been
completed before
- /// proceeding.
- Future<> on_closed() { return on_closed_; }
-
- protected:
- /// Subclasses should override this and perform any cleanup. Once the
future returned
- /// by this method finishes then this object is eligible for destruction and
any
- /// reference to `this` will be invalid
- virtual Future<> DoDestroy() = 0;
-
- private:
- void Destroy();
-
- Future<> on_closed_;
-#ifndef NDEBUG
- bool constructed_correctly_ = false;
-#endif
-
- template <typename T>
- friend struct DestroyingDeleter;
- template <typename T, typename... Args>
- friend std::shared_ptr<T> MakeSharedAsync(Args&&... args);
- template <typename T, typename... Args>
- friend std::unique_ptr<T, DestroyingDeleter<T>> MakeUniqueAsync(Args&&...
args);
-};
-
-template <typename T, typename... Args>
-std::shared_ptr<T> MakeSharedAsync(Args&&... args) {
- static_assert(std::is_base_of<AsyncDestroyable, T>::value,
- "Nursery::MakeSharedCloseable only works with AsyncDestroyable
types");
- std::shared_ptr<T> ptr(new T(std::forward<Args&&>(args)...),
DestroyingDeleter<T>());
-#ifndef NDEBUG
- ptr->constructed_correctly_ = true;
-#endif
- return ptr;
-}
-
-template <typename T, typename... Args>
-std::unique_ptr<T, DestroyingDeleter<T>> MakeUniqueAsync(Args&&... args) {
- static_assert(std::is_base_of<AsyncDestroyable, T>::value,
- "Nursery::MakeUniqueCloseable only works with AsyncDestroyable
types");
- std::unique_ptr<T, DestroyingDeleter<T>> ptr(new
T(std::forward<Args>(args)...),
- DestroyingDeleter<T>());
-#ifndef NDEBUG
- ptr->constructed_correctly_ = true;
-#endif
- return ptr;
-}
-
-/// A utility which keeps track of a collection of asynchronous tasks
+/// By default the scheduler will submit the task (execute the synchronous
part) as
+/// soon as it is added, assuming the underlying thread pool hasn't terminated
or the
+/// scheduler hasn't aborted. In this mode the scheduler is simply acting as
+/// a task group, keeping track of the ongoing work.
///
/// This can be used to provide structured concurrency for asynchronous
development.
/// A task group created at a high level can be distributed amongst low level
components
/// which register work to be completed. The high level job can then wait for
all work
/// to be completed before cleaning up.
-class ARROW_EXPORT AsyncTaskGroup {
+///
+/// A task scheduler must eventually be ended when all tasks have been added.
Once the
+/// scheduler has been ended it is an error to add further tasks. Note, it is
not an
+/// error to add additional tasks after a scheduler has aborted (though these
tasks
+/// will be ignored and never submitted). The scheduler has a futuer which
will complete
+/// once the scheduler has been ended AND all remaining tasks have finished
executing.
+/// Ending a scheduler will NOT cause the scheduler to flush existing tasks.
+///
+/// Task failure (either the synchronous portion or the asynchronous portion)
will cause
+/// the scheduler to enter an aborted state. The first such failure will be
reported in
+/// the final task future.
+///
+/// The scheduler can also be manually aborted. A cancellation status will be
reported as
+/// the final task future.
+///
+/// It is also possible to limit the number of concurrent tasks the scheduler
will
+/// execute. This is done by setting a task limit. The task limit initially
assumes all
+/// tasks are equal but a custom cost can be supplied when scheduling a task
(e.g. based
+/// on the total I/O cost of the task, or the expected RAM utilization of the
task)
+///
+/// When the total number of running tasks is limited then scheduler priority
may also
+/// become a consideration. By default the scheduler runs with a FIFO queue
but a custom
+/// task queue can be provided. One could, for example, use a priority queue
to control
+/// the order in which tasks are executed.
+///
+/// It is common to have multiple stages of execution. For example, when
scanning, we
+/// first inspect each fragment (the inspect stage) to figure out the row
groups and then
+/// we scan row groups (the scan stage) to read in the data. This sort of
multi-stage
+/// execution should be represented as two seperate task groups. The first
task group can
+/// then have a custom finish callback which ends the second task group.
+class ARROW_EXPORT AsyncTaskScheduler {
public:
- /// Add a task to be tracked by this task group
- ///
- /// If a previous task has failed then adding a task will fail
- ///
- /// If WaitForTasksToFinish has been called and the returned future has been
marked
- /// completed then adding a task will fail.
- Status AddTask(std::function<Result<Future<>>()> task);
- /// Same as AddTask but doesn't add the task if End() has been called.
- ///
- /// \return true if the task was started, false if the group had already
ended
- Result<bool> AddTaskIfNotEnded(std::function<Result<Future<>>()> task);
- /// Add a task that has already been started
- Status AddTask(const Future<>& task);
- /// \brief Attempt to add a task that has already been started to this
group's tracking
- ///
- /// The return value must be paid attention to. If the return value is
false then the
- /// task could not be added because the group had already ended and so the
caller must
- /// track the external task some other way.
- Result<bool> AddTaskIfNotEnded(const Future<>& task);
- /// Signal that top level tasks are done being added
+ /// Destructor for AsyncTaskScheduler
+ ///
+ /// If a scheduler is not in the ended state when it is destroyed then it
+ /// will enter an aborted state.
+ ///
+ /// The destructor will block until all submitted tasks have finished.
+ virtual ~AsyncTaskScheduler() = default;
+ /// An interface for a task
+ ///
+ /// Users may want to override this, for example, to add priority
+ /// information for use by a queue.
+ class Task {
+ public:
+ virtual ~Task() = default;
+ /// Submit the task
+ ///
+ /// This will be called by the scheduler at most once when there
+ /// is space to run the task. This is expected to be a fairly quick
+ /// function that simply submits the actual task work to an external
+ /// resource (e.g. I/O thread pool).
+ ///
+ /// If this call fails then the scheduler will enter an aborted state.
+ virtual Result<Future<>> operator()(AsyncTaskScheduler* scheduler) = 0;
+ /// The cost of the task
+ ///
+ /// The scheduler limits the total number of concurrent tasks based
+ /// on cost. A custom cost may be used, for example, if you would like
+ /// to limit the number of tasks based on the total expected RAM usage of
+ /// the tasks (this is done in the scanner)
+ virtual int cost() const { return 1; }
+ };
+
+ /// An interface for a task queue
+ ///
+ /// A queue's methods will not be called concurrently
+ class Queue {
+ public:
+ virtual ~Queue() = default;
+ /// Push a task to the queue
+ virtual void Push(std::unique_ptr<Task> task) = 0;
+ /// Pop the next task from the queue
+ virtual std::unique_ptr<Task> Pop() = 0;
+ /// Peek the next task in the queue
+ virtual const Task& Peek() = 0;
+ /// Check if the queue is empty
+ virtual bool Empty() = 0;
+ /// Purge the queue of all items
+ virtual void Purge() = 0;
+ };
+
+ class Throttle {
+ public:
+ virtual ~Throttle() = default;
+ /// Acquire amt permits
+ ///
+ /// If nullopt is returned then the permits were immediately
+ /// acquired and the caller can proceed. If a future is returned then the
caller
+ /// should wait for the future to complete first. When the returned
future completes
+ /// the permits have NOT been acquired and the caller must call Acquire
again
+ virtual util::optional<Future<>> TryAcquire(int amt) = 0;
+ /// Release amt permits
+ ///
+ /// This will possibly complete waiting futures and should probably not be
+ /// called while holding locks.
+ virtual void Release(int amt) = 0;
+ };
+ /// Create a throttle
+ ///
+ /// This throttle is used to limit how many tasks can run at once. The
+ /// user should keep the throttle alive for the lifetime of the scheduler.
+ /// The same throttle can be used in multiple schedulers.
+ static std::unique_ptr<Throttle> MakeThrottle(int max_concurrent_cost);
+
+ /// Add a task to the scheduler
+ ///
+ /// If the scheduler is in an aborted state this call will return false and
the task
+ /// will never be run. This is harmless and does not need to be guarded
against.
+ ///
+ /// If the scheduler is in an ended state then this call will cause an
abort. This
+ /// represents a logic error in the program and should be avoidable.
Review Comment:
I added a message to the dcheck and added a lengthier explanation in a
comment before the dcheck which will hopefully assist future developers.
There is potentially one scenario where adding a task after the scheduler
has ended is ok and this is where the code doing the task-adding is a task on
the scheduler itself. This scenario popped up in the tpc-h generator but I was
able to work around it. For the moment I'd like to keep things as they are but
I'm open to possibly softening this requirement down the road.
It's may be possible to get of `End` entirely by requiring that all tasks
are added by existing tasks. Creating the initial scheduler would then require
supplying an "initial_task". In that case we would know that a scheduler is
finished when it runs out of tasks and would never need an `End` call. I'll
leave that for a follow-up (ARROW-17509)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]