westonpace commented on code in PR #34060:
URL: https://github.com/apache/arrow/pull/34060#discussion_r1101952906
##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -53,5 +56,98 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};
+/// A queue that sequences incoming batches
+///
+/// This can be used when a node needs to do some kind of ordered processing on
+/// the stream.
+///
+/// Batches can be inserted in any order. The process_callback will be called
on
+/// the batches, in order, without reentrant calls. For this reason the
callback
+/// should be quick.
+///
+/// For example, in a top-n node, the process callback should determine how
many
+/// rows need to be delivered for the given batch, and then return a task to
actually
+/// deliver those rows.
+class SequencingQueue {
+ public:
+ using Task = std::function<Status()>;
+
+ /// Strategy that describes how to handle items
+ class Processor {
+ public:
+ /// Process the batch, potentially generating a task
+ ///
+ /// This method will be called on each batch in order. Calls to this
method
+ /// will be serialized and it will not be called reentrantly. This makes
it
+ /// safe to do things that rely on order but minimal time should be spent
here
+ /// to avoid becoming a bottlneck.
+ ///
+ /// \return a follow-up task that will be scheduled. The follow-up
task(s) are
+ /// is not guaranteed to run in any particular order. If nullopt
is
+ /// returned then nothing will be scheduled.
+ virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
+ /// Schedule a task
+ virtual void Schedule(Task task) = 0;
+ };
+
+ virtual ~SequencingQueue() = default;
+
+ /// Insert a batch into the queue
+ ///
+ /// This will insert the batch into the queue. If this batch was the next
batch
+ /// to deliver then this will trigger 1+ calls to the process callback to
generate
+ /// 1+ tasks.
+ ///
+ /// The task generated by this call will be executed immediately. The
remaining
+ /// tasks will be scheduled using the schedule callback.
Review Comment:
I cleaned up the wording a little and added an extra paragraph.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]