westonpace commented on a change in pull request #9533:
URL: https://github.com/apache/arrow/pull/9533#discussion_r579564527



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -250,30 +250,40 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoMarkFinishedOrFailed(FutureState state) {
-    {
-      // Lock the hypothetical waiter first, and the future after.
-      // This matches the locking order done in FutureWaiter constructor.
-      std::unique_lock<std::mutex> waiter_lock(global_waiter_mutex);
-      std::unique_lock<std::mutex> lock(mutex_);
-
-      DCHECK(!IsFutureFinished(state_)) << "Future already marked finished";
-      state_ = state;
-      if (waiter_ != nullptr) {
-        waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state);
-      }
+    // Lock the hypothetical waiter first, and the future after.
+    // This matches the locking order done in FutureWaiter constructor.
+    std::unique_lock<std::mutex> waiter_lock(global_waiter_mutex);
+    std::unique_lock<std::mutex> lock(mutex_);
+
+    DCHECK(!IsFutureFinished(state_)) << "Future already marked finished";
+    state_ = state;
+    if (waiter_ != nullptr) {
+      waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state);
     }
+
+    // Waiters are alerted before callbacks are run
+    waiter_lock.unlock();
     cv_.notify_all();
 
-    // run callbacks, lock not needed since the future is finsihed by this
-    // point so nothing else can modify the callbacks list and it is safe
-    // to iterate.
-    //
-    // In fact, it is important not to hold the locks because the callback
-    // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    std::vector<Callback> callbacks_to_run = std::move(callbacks_);

Review comment:
       @pitrou @bkietz This adds a bit of overhead to callback execution but it 
ensures that TryAddCallback will only return false if all previously added 
callbacks have finished.  Callbacks can now be added while the future is in the 
process of being completed which opens up the door for concurrent access to 
`callbacks_` hence the lock and move.
   
   Note, this is unrelated to the lockfree queue.  My first iteration of serial 
readahead used mutex & std::queue and still ran into this problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to