bkietz commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r584777161



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState {
+  explicit ToBatchesState(size_t n_tasks)
+      : batches(n_tasks), task_drained(n_tasks, false) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+    std::lock_guard<std::mutex> lock(mutex);
+    if (batches.size() <= i_task) {
+      batches.resize(i_task + 1);
+      task_drained.resize(i_task + 1);
+    }
+    batches[i_task].push_back(std::move(b));
+  }
+
+  Status Finish(size_t position) {
+    std::lock_guard<std::mutex> lock(mutex);
+    task_drained[position] = true;
+    return Status::OK();
+  }
+
+  std::shared_ptr<RecordBatch> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, 
[this] {

Review comment:
       This is indeed an (embarrassing) error. The intent was to throttle 
checking the variable to 1ms intervals but I forgot to write the enclosing 
loop. I'll replace with a simple call to `wait()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to