westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r566551201
##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -119,14 +123,30 @@ class ReadaheadQueue::Impl : public
std::enable_shared_from_this<ReadaheadQueue:
void DoWork() {
std::unique_lock<std::mutex> lock(mutex_);
while (!please_shutdown_) {
- while (static_cast<int64_t>(done_.size()) < max_readahead_ &&
todo_.size() > 0) {
+ while (todo_.size() > 0 &&
+ ((max_readahead_ <= 0) ||
+ (static_cast<int64_t>(done_.size()) < max_readahead_))) {
auto promise = std::move(todo_.front());
todo_.pop_front();
lock.unlock();
- promise->Call();
+ if (promise->Call()) {
+ // If the call finished then we should purge the remaining TODO
items, marking
+ // them finished
+ lock.lock();
+ std::deque<std::unique_ptr<ReadaheadPromise>>
to_clear(std::move(todo_));
+ // While the async iterator doesn't use todo_ anymore after it hits
a finish the
+ // sync iterator might still due to timing so leave it valid
+ todo_.clear();
+ lock.unlock();
+ for (auto&& promise : to_clear) {
+ promise->End();
+ }
+ }
lock.lock();
- done_.push_back(std::move(promise));
- work_done_.notify_one();
+ if (max_readahead_ > 0) {
+ done_.push_back(std::move(promise));
+ work_done_.notify_one();
+ }
Review comment:
Ok. I've tracked down the oddness here and it actually unwound quite a
bit of complexity. It turns out that readahead is something of an inherently
asynchronous concept. The ReadaheadQueue was actually an asynchronous
generator pipeline in disguise.
ReadaheadQueueImpl is actually part executor and part AddReadahead.
Part executor: ReadaheadQueueImpl is a thread pool of size 1. It has a
worker thread (the background thread) and a job queue
(ReadaheadQueueImpl::todo_). It accepted jobs on the job queue and ran them.
The types were even named "promises".
Part readahead: It was also part readahead. The readahead queue was
ReadaheadQueueImpl::done_ and all of the logic that was in the pumping.
So I got rid of it and replaced it with a thread pool of size 1 (which has
an unlimited size job queue), BackgroundGenerator (which now takes in an
executor and an iterator and creates an AsyncGenerator), AddReadahead (which
creates readahead queue, this one is limited in size), and finally
GeneratorIterator which brings us back to a synchronous iterator by waiting
each result.
The tests all pass, except the logic changed a little bit on the tracing
test because the readahead now doesn't pump until the first read but we can
change that if we want to so it is more like the threaded reader and less like
the serial reader.
I've made the change on a separate branch from this one so that change can
be reviewed independently (https://github.com/westonpace/arrow/pull/3).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]