bkietz commented on a change in pull request #9589: URL: https://github.com/apache/arrow/pull/9589#discussion_r584777161
########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState { + explicit ToBatchesState(size_t n_tasks) + : batches(n_tasks), task_drained(n_tasks, false) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void Push(std::shared_ptr<RecordBatch> b, size_t i_task) { + std::lock_guard<std::mutex> lock(mutex); + if (batches.size() <= i_task) { + batches.resize(i_task + 1); + task_drained.resize(i_task + 1); + } + batches[i_task].push_back(std::move(b)); + } + + Status Finish(size_t position) { + std::lock_guard<std::mutex> lock(mutex); + task_drained[position] = true; + return Status::OK(); + } + + std::shared_ptr<RecordBatch> Pop() { + std::unique_lock<std::mutex> lock(mutex); + std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, [this] { Review comment: This is indeed an (embarrassing) error. The intent was to throttle checking the variable to 1ms intervals but I forgot to write the enclosing loop. I'll replace with a simple call to `wait()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org