westonpace commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r832733512
##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -1376,6 +1521,59 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
}
}
+TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
+ ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
+ // If a failure causes an early end then we should not emit that failure
+ // until all in-flight futures have completed. This is to prevent tasks from
+ // outliving the generator
+ std::atomic<int32_t> counter(0);
+ auto failure_gating_task = GatingTask::Make();
+ auto in_flight_gating_task = GatingTask::Make();
+ auto source = [&]() -> Future<TestInt> {
+ auto count = counter++;
+ return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
+ if (count == 0) {
+ failure_gating_task->Task()();
+ return Status::Invalid("X");
+ }
+ in_flight_gating_task->Task()();
+ // These are our in-flight tasks
+ return TestInt(0);
+ }));
+ };
+ auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
+ auto should_be_invalid = readahead();
+ ASSERT_OK(in_flight_gating_task->WaitForRunning(10));
+ ASSERT_OK(failure_gating_task->Unlock());
+ SleepABit();
+ // Can't be finished because in-flight tasks are still running
+ AssertNotFinished(should_be_invalid);
+ ASSERT_OK(in_flight_gating_task->Unlock());
+}
Review comment:
We should (and now do)
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1058,18 +1109,73 @@ class MergedGenerator {
return source();
}
+ void SignalErrorUnlocked() {
+ broken = true;
+ // Empty any results that have arrived but not asked for.
+ while (!delivered_jobs.empty()) {
+ delivered_jobs.pop_front();
+ }
+ }
+
+ void Purge() {
Review comment:
Done.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1058,18 +1109,73 @@ class MergedGenerator {
return source();
}
+ void SignalErrorUnlocked() {
+ broken = true;
+ // Empty any results that have arrived but not asked for.
+ while (!delivered_jobs.empty()) {
+ delivered_jobs.pop_front();
+ }
+ }
+
+ void Purge() {
+ while (!waiting_jobs.empty()) {
+ waiting_jobs.front()->MarkFinished(IterationEnd<T>());
+ waiting_jobs.pop_front();
+ }
+ }
+
+ void MarkFinished() {
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]