lidavidm commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r830244406
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -60,6 +60,9 @@ namespace arrow {
// Readahead operators, and some other operators, may introduce queueing. Any
operators
// that introduce buffering should detail the amount of buffering they
introduce in their
// MakeXYZ function comments.
+//
+// A generator should always be fully consumed before it is destroyed.
+// A generator should not emit a terminal item until it has finished all
ongoing futures.
Review comment:
Here, "emit a terminal item" is more like "let a future terminate with
an error or an end-of-stream marker"? or i guess "item (future) that will
terminate the stream when resolved". The wording reads a little oddly to me
because it feels like the generator can see the future
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -775,6 +791,7 @@ class ReadaheadGenerator {
if (state_->finished.load()) {
state_->readahead_queue.push(AsyncGeneratorEnd<T>());
} else {
+ state_->num_running.fetch_add(1);
Review comment:
Though I guess we're comparing with 1 instead of 0 up
aboveā¦'num_running' doesn't sound like quite the right name to me but it's also
unclear how exactly to articulate what exactly this counter represents.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -775,6 +791,7 @@ class ReadaheadGenerator {
if (state_->finished.load()) {
state_->readahead_queue.push(AsyncGeneratorEnd<T>());
} else {
+ state_->num_running.fetch_add(1);
Review comment:
This seems off; that means on the initial call, we'll set num_running to
max_readahead + 1?
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1058,18 +1109,73 @@ class MergedGenerator {
return source();
}
+ void SignalErrorUnlocked() {
+ broken = true;
+ // Empty any results that have arrived but not asked for.
+ while (!delivered_jobs.empty()) {
+ delivered_jobs.pop_front();
+ }
+ }
+
+ void Purge() {
+ while (!waiting_jobs.empty()) {
+ waiting_jobs.front()->MarkFinished(IterationEnd<T>());
+ waiting_jobs.pop_front();
+ }
+ }
+
+ void MarkFinished() {
+ all_finished.MarkFinished();
+ Purge();
+ }
+
+ // This is called outside the mutex but it is only ever called
+ // once and Future<>::AddCallback is thread-safe
+ void MarkFinalError(const Status& err, Future<T> maybe_sink) {
+ if (maybe_sink.is_valid()) {
+ // Someone is waiting for this error so lets mark it complete when
+ // all the work is done
+ // all_finished will get called by something with a strong pointer to
state
+ // so we can safely capture this
+ all_finished.AddCallback([maybe_sink, err](const Status& status)
mutable {
+ maybe_sink.MarkFinished(err);
+ });
+ } else {
+ // No one is waiting for this error right now so it will be delivered
+ // next.
+ final_error = err;
+ }
+ }
+
+ bool IsComplete() {
+ return outstanding_requests == 0 &&
+ (broken || (source_exhausted && num_running_subscriptions == 0 &&
+ delivered_jobs.empty()));
+ }
+
+ bool MarkTaskFinishedUnlocked() {
+ --outstanding_requests;
+ return IsComplete();
+ }
+
AsyncGenerator<AsyncGenerator<T>> source;
Review comment:
I think for the sake of future readers we should give each of these
fields a description and invariant (and probably each of the structs too)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]