westonpace commented on code in PR #34060:
URL: https://github.com/apache/arrow/pull/34060#discussion_r1101952906


##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -53,5 +56,98 @@ class AccumulationQueue {
   std::vector<ExecBatch> batches_;
 };
 
+/// A queue that sequences incoming batches
+///
+/// This can be used when a node needs to do some kind of ordered processing on
+/// the stream.
+///
+/// Batches can be inserted in any order.  The process_callback will be called 
on
+/// the batches, in order, without reentrant calls. For this reason the 
callback
+/// should be quick.
+///
+/// For example, in a top-n node, the process callback should determine how 
many
+/// rows need to be delivered for the given batch, and then return a task to 
actually
+/// deliver those rows.
+class SequencingQueue {
+ public:
+  using Task = std::function<Status()>;
+
+  /// Strategy that describes how to handle items
+  class Processor {
+   public:
+    /// Process the batch, potentially generating a task
+    ///
+    /// This method will be called on each batch in order.  Calls to this 
method
+    /// will be serialized and it will not be called reentrantly.  This makes 
it
+    /// safe to do things that rely on order but minimal time should be spent 
here
+    /// to avoid becoming a bottlneck.
+    ///
+    /// \return a follow-up task that will be scheduled.  The follow-up 
task(s) are
+    ///         is not guaranteed to run in any particular order.  If nullopt 
is
+    ///         returned then nothing will be scheduled.
+    virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
+    /// Schedule a task
+    virtual void Schedule(Task task) = 0;
+  };
+
+  virtual ~SequencingQueue() = default;
+
+  /// Insert a batch into the queue
+  ///
+  /// This will insert the batch into the queue.  If this batch was the next 
batch
+  /// to deliver then this will trigger 1+ calls to the process callback to 
generate
+  /// 1+ tasks.
+  ///
+  /// The task generated by this call will be executed immediately.  The 
remaining
+  /// tasks will be scheduled using the schedule callback.

Review Comment:
   I cleaned up the wording a little and added an extra paragraph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to