westonpace commented on a change in pull request #9589: URL: https://github.com/apache/arrow/pull/9589#discussion_r583932908
########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState { + explicit ToBatchesState(size_t n_tasks) + : batches(n_tasks), task_drained(n_tasks, false) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void Push(std::shared_ptr<RecordBatch> b, size_t i_task) { + std::lock_guard<std::mutex> lock(mutex); + if (batches.size() <= i_task) { + batches.resize(i_task + 1); + task_drained.resize(i_task + 1); + } + batches[i_task].push_back(std::move(b)); + } + + Status Finish(size_t position) { + std::lock_guard<std::mutex> lock(mutex); + task_drained[position] = true; + return Status::OK(); + } + + std::shared_ptr<RecordBatch> Pop() { + std::unique_lock<std::mutex> lock(mutex); + std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, [this] { Review comment: This seems a little off. It looks like you're saying "If the current batch is still being filled then wait up to 1ms for it to add a new item." But what happens if that 1ms expires? Also, it looks like you're ignoring the return of `wait_for`. For example, let's pretend that there are two scan tasks. Scan task 1 RB 1 arrives at 10ms Scan task 1 RB 1 arrives at 20ms Scan task 2 RB 1 arrives at 30ms The consumer grabs scan tasks very quickly. So they come in at timestamp 15ms and they try to `Pop`. batches[0]. It will sit in this loop for about 1ms (because tasks_drained[0] == false) and then break out. Then pop_cursor == batches.size() will be true and so it will return null. ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState { + explicit ToBatchesState(size_t n_tasks) + : batches(n_tasks), task_drained(n_tasks, false) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void Push(std::shared_ptr<RecordBatch> b, size_t i_task) { + std::lock_guard<std::mutex> lock(mutex); + if (batches.size() <= i_task) { + batches.resize(i_task + 1); + task_drained.resize(i_task + 1); + } + batches[i_task].push_back(std::move(b)); + } + + Status Finish(size_t position) { + std::lock_guard<std::mutex> lock(mutex); + task_drained[position] = true; + return Status::OK(); + } + + std::shared_ptr<RecordBatch> Pop() { + std::unique_lock<std::mutex> lock(mutex); + std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, [this] { + while (pop_cursor < batches.size()) { + // queue for current scan task contains at least one batch, pop that + if (!batches[pop_cursor].empty()) return true; + + // queue is empty but will be appended to eventually, wait for that + if (!task_drained[pop_cursor]) return false; + + ++pop_cursor; + } + // all scan tasks drained, terminate + return true; + }); + + if (pop_cursor == batches.size()) return nullptr; + + auto batch = std::move(batches[pop_cursor].front()); + batches[pop_cursor].pop_front(); + return batch; + } +}; + +Result<RecordBatchIterator> Scanner::ToBatches() { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); + ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector()); + + auto task_group = scan_context_->TaskGroup(); + auto state = std::make_shared<ToBatchesState>(scan_task_vector.size()); + + size_t scan_task_id = 0; + for (auto scan_task : scan_task_vector) { + auto id = scan_task_id++; + task_group->Append([state, id, scan_task] { Review comment: Again, what I'm working on will work around this so maybe not stress at the moment but there's no back-pressure here. If the batch consumer is not fast enough and the dataset is larger than RAM the system will run out of RAM. It's a bit odd because you are fixing ARROW-11800 here (though without pressure) and then I'll be breaking it again with my implementation (the first pass of my impl will have back pressure and parse loaded buffers a bit more serially) ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState { + explicit ToBatchesState(size_t n_tasks) + : batches(n_tasks), task_drained(n_tasks, false) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void Push(std::shared_ptr<RecordBatch> b, size_t i_task) { + std::lock_guard<std::mutex> lock(mutex); + if (batches.size() <= i_task) { + batches.resize(i_task + 1); + task_drained.resize(i_task + 1); + } + batches[i_task].push_back(std::move(b)); + } + + Status Finish(size_t position) { + std::lock_guard<std::mutex> lock(mutex); + task_drained[position] = true; + return Status::OK(); + } + + std::shared_ptr<RecordBatch> Pop() { + std::unique_lock<std::mutex> lock(mutex); + std::condition_variable().wait_for(lock, std::chrono::milliseconds{1}, [this] { + while (pop_cursor < batches.size()) { + // queue for current scan task contains at least one batch, pop that + if (!batches[pop_cursor].empty()) return true; + + // queue is empty but will be appended to eventually, wait for that + if (!task_drained[pop_cursor]) return false; + + ++pop_cursor; + } + // all scan tasks drained, terminate + return true; + }); + + if (pop_cursor == batches.size()) return nullptr; + + auto batch = std::move(batches[pop_cursor].front()); + batches[pop_cursor].pop_front(); + return batch; + } +}; + +Result<RecordBatchIterator> Scanner::ToBatches() { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); + ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector()); Review comment: I'll be replacing this with something better so I don't know how much we care to worry but this is not ideal. For example, with parquet, this would fetch metadata for every file in the scan before starting to read any individual file. It introduces more latency than necessary. Also, I'm not sure how this will interact with parquet preloading. ########## File path: cpp/src/arrow/dataset/scanner.h ########## @@ -163,12 +164,20 @@ class ARROW_DS_EXPORT Scanner { /// in a concurrent fashion and outlive the iterator. Result<ScanTaskIterator> Scan(); + /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple + /// threads are used, the visitor will be invoked from those threads and is + /// responsible for any synchronization. + Status Scan(std::function<Status(std::shared_ptr<RecordBatch>)> visitor); Review comment: Why both `Scan(visitor)` and `ToBatches`? Couldn't you just do `ToBatches().Visit(visitor)`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org