lidavidm commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r830244406



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -60,6 +60,9 @@ namespace arrow {
 // Readahead operators, and some other operators, may introduce queueing.  Any 
operators
 // that introduce buffering should detail the amount of buffering they 
introduce in their
 // MakeXYZ function comments.
+//
+// A generator should always be fully consumed before it is destroyed.
+// A generator should not emit a terminal item until it has finished all 
ongoing futures.

Review comment:
       Here, "emit a terminal item" is more like "let a future terminate with 
an error or an end-of-stream marker"? or i guess "item (future) that will 
terminate the stream when resolved". The wording reads a little oddly to me 
because it feels like the generator can see the future

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -775,6 +791,7 @@ class ReadaheadGenerator {
     if (state_->finished.load()) {
       state_->readahead_queue.push(AsyncGeneratorEnd<T>());
     } else {
+      state_->num_running.fetch_add(1);

Review comment:
       Though I guess we're comparing with 1 instead of 0 up 
above…'num_running' doesn't sound like quite the right name to me but it's also 
unclear how exactly to articulate what exactly this counter represents.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -775,6 +791,7 @@ class ReadaheadGenerator {
     if (state_->finished.load()) {
       state_->readahead_queue.push(AsyncGeneratorEnd<T>());
     } else {
+      state_->num_running.fetch_add(1);

Review comment:
       This seems off; that means on the initial call, we'll set num_running to 
max_readahead + 1?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1058,18 +1109,73 @@ class MergedGenerator {
       return source();
     }
 
+    void SignalErrorUnlocked() {
+      broken = true;
+      // Empty any results that have arrived but not asked for.
+      while (!delivered_jobs.empty()) {
+        delivered_jobs.pop_front();
+      }
+    }
+
+    void Purge() {
+      while (!waiting_jobs.empty()) {
+        waiting_jobs.front()->MarkFinished(IterationEnd<T>());
+        waiting_jobs.pop_front();
+      }
+    }
+
+    void MarkFinished() {
+      all_finished.MarkFinished();
+      Purge();
+    }
+
+    // This is called outside the mutex but it is only ever called
+    // once and Future<>::AddCallback is thread-safe
+    void MarkFinalError(const Status& err, Future<T> maybe_sink) {
+      if (maybe_sink.is_valid()) {
+        // Someone is waiting for this error so lets mark it complete when
+        // all the work is done
+        // all_finished will get called by something with a strong pointer to 
state
+        // so we can safely capture this
+        all_finished.AddCallback([maybe_sink, err](const Status& status) 
mutable {
+          maybe_sink.MarkFinished(err);
+        });
+      } else {
+        // No one is waiting for this error right now so it will be delivered
+        // next.
+        final_error = err;
+      }
+    }
+
+    bool IsComplete() {
+      return outstanding_requests == 0 &&
+             (broken || (source_exhausted && num_running_subscriptions == 0 &&
+                         delivered_jobs.empty()));
+    }
+
+    bool MarkTaskFinishedUnlocked() {
+      --outstanding_requests;
+      return IsComplete();
+    }
+
     AsyncGenerator<AsyncGenerator<T>> source;

Review comment:
       I think for the sake of future readers we should give each of these 
fields a description and invariant (and probably each of the structs too)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to