westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564754374
##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -119,14 +123,30 @@ class ReadaheadQueue::Impl : public
std::enable_shared_from_this<ReadaheadQueue:
void DoWork() {
std::unique_lock<std::mutex> lock(mutex_);
while (!please_shutdown_) {
- while (static_cast<int64_t>(done_.size()) < max_readahead_ &&
todo_.size() > 0) {
+ while (todo_.size() > 0 &&
+ ((max_readahead_ <= 0) ||
+ (static_cast<int64_t>(done_.size()) < max_readahead_))) {
auto promise = std::move(todo_.front());
todo_.pop_front();
lock.unlock();
- promise->Call();
+ if (promise->Call()) {
+ // If the call finished then we should purge the remaining TODO
items, marking
+ // them finished
+ lock.lock();
+ std::deque<std::unique_ptr<ReadaheadPromise>>
to_clear(std::move(todo_));
+ // While the async iterator doesn't use todo_ anymore after it hits
a finish the
+ // sync iterator might still due to timing so leave it valid
+ todo_.clear();
+ lock.unlock();
+ for (auto&& promise : to_clear) {
+ promise->End();
+ }
+ }
lock.lock();
- done_.push_back(std::move(promise));
- work_done_.notify_one();
+ if (max_readahead_ > 0) {
+ done_.push_back(std::move(promise));
+ work_done_.notify_one();
+ }
Review comment:
I think we will want a general purpose producer/consumer queue with both
synchronous and asynchronous APIs. The AddReadahead is bounded by the "done"
pool is in AddReadahead (and not in the readahead queue). I agree it needs
some simplification but I found the old method equally confusing. Both
approaches should be able to share a single multi-producer / multi-consumer
"queue" API (maybe single-consumer / single-producer is sufficient if there are
performance benefits to such a queue).
I will take a pass at simplifying this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]