westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565987394



##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -119,14 +123,30 @@ class ReadaheadQueue::Impl : public 
std::enable_shared_from_this<ReadaheadQueue:
   void DoWork() {
     std::unique_lock<std::mutex> lock(mutex_);
     while (!please_shutdown_) {
-      while (static_cast<int64_t>(done_.size()) < max_readahead_ && 
todo_.size() > 0) {
+      while (todo_.size() > 0 &&
+             ((max_readahead_ <= 0) ||
+              (static_cast<int64_t>(done_.size()) < max_readahead_))) {
         auto promise = std::move(todo_.front());
         todo_.pop_front();
         lock.unlock();
-        promise->Call();
+        if (promise->Call()) {
+          // If the call finished then we should purge the remaining TODO 
items, marking
+          // them finished
+          lock.lock();
+          std::deque<std::unique_ptr<ReadaheadPromise>> 
to_clear(std::move(todo_));
+          // While the async iterator doesn't use todo_ anymore after it hits 
a finish the
+          // sync iterator might still due to timing so leave it valid
+          todo_.clear();
+          lock.unlock();
+          for (auto&& promise : to_clear) {
+            promise->End();
+          }
+        }
         lock.lock();
-        done_.push_back(std::move(promise));
-        work_done_.notify_one();
+        if (max_readahead_ > 0) {
+          done_.push_back(std::move(promise));
+          work_done_.notify_one();
+        }

Review comment:
       Also, it isn't "unbounded readahead".  Both implementations right now 
have two queues.  In the synchronous case both queues are in ReadaheadQueueImpl 
(`todo_` and `done_`).  In the asynchronous case there is one queue in 
ReadaheadQueueImpl (`todo_`) and the other is in `ReadaheadGenerator 
(`readahead_queue_`).  The asynchronous case enforces the queue length with 
`readahead_queue_` and the background thread will block if `readahead_queue_` 
isn't pulled fast enough (because nothing will be inserted in `todo_`).
   
   Now, I can also understand how that isn't clear at all in the code.  I still 
plan on taking a pass at simplifying things.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to