westonpace commented on code in PR #34060:
URL: https://github.com/apache/arrow/pull/34060#discussion_r1101952642


##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -54,5 +59,112 @@ void AccumulationQueue::Clear() {
 }
 
 ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
+
+namespace {
+
+struct LowestBatchIndexAtTop {
+  bool operator()(const ExecBatch& left, const ExecBatch& right) const {
+    return left.index > right.index;
+  }
+};
+
+class SequencingQueueImpl : public SequencingQueue {
+ public:
+  explicit SequencingQueueImpl(Processor* processor) : processor_(processor) {}
+
+  Status InsertBatch(ExecBatch batch) override {
+    std::unique_lock lk(mutex_);
+    if (batch.index == next_index_) {
+      return DeliverNextUnlocked(std::move(batch), std::move(lk));
+    }
+    queue_.emplace(std::move(batch));
+    return Status::OK();
+  }
+
+ private:
+  Status DeliverNextUnlocked(ExecBatch batch, std::unique_lock<std::mutex>&& 
lk) {
+    // Should be able to detect and avoid this at plan construction
+    DCHECK_NE(batch.index, ::arrow::compute::kUnsequencedIndex)
+        << "attempt to use a sequencing queue on an unsequenced stream of 
batches";
+    std::vector<Task> tasks;
+    next_index_++;
+    ARROW_ASSIGN_OR_RAISE(std::optional<Task> this_task,
+                          processor_->Process(std::move(batch)));
+    while (!queue_.empty() && next_index_ == queue_.top().index) {
+      ARROW_ASSIGN_OR_RAISE(std::optional<Task> task, 
processor_->Process(queue_.top()));
+      if (task) {
+        tasks.push_back(std::move(*task));
+      }
+      queue_.pop();
+      next_index_++;
+    }
+    lk.unlock();
+    // Schedule tasks for stale items
+    for (auto& task : tasks) {
+      processor_->Schedule(std::move(task));
+    }
+    // Run the current item immediately
+    if (this_task) {
+      ARROW_RETURN_NOT_OK(std::move(*this_task)());
+    }
+    return Status::OK();
+  }
+
+  Processor* processor_;
+
+  std::priority_queue<ExecBatch, std::vector<ExecBatch>, 
LowestBatchIndexAtTop> queue_;
+  int next_index_ = 0;
+  std::mutex mutex_;
+};
+
+class SerialSequencingQueueImpl : public SerialSequencingQueue {
+ public:
+  explicit SerialSequencingQueueImpl(Processor* processor) : 
processor_(processor) {}
+
+  Status InsertBatch(ExecBatch batch) override {
+    std::unique_lock lk(mutex_);
+    queue_.push(std::move(batch));
+    if (queue_.top().index == next_index_ && !is_processing_) {
+      is_processing_ = true;
+      return DoProcess(std::move(lk));
+    }
+    return Status::OK();
+  }
+
+ private:
+  Status DoProcess(std::unique_lock<std::mutex>&& lk) {
+    while (!queue_.empty() && queue_.top().index == next_index_) {
+      ExecBatch next(queue_.top());
+      queue_.pop();
+      next_index_++;
+      lk.unlock();
+      // If we bail here we don't hold the lock so that is ok.  is_processing_ 
will
+      // never switch to true so no other threads can process but that should 
be ok

Review Comment:
   Yes.  Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to