lidavidm commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r830484780



##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -553,9 +553,137 @@ TEST_P(MergedGeneratorTestFixture, Merged) {
 
 TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
   auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
-      {MakeSource({1, 2, 3}), MakeFailingSource()});
+      {MakeSource({1, 2, 3}), FailsAt(MakeSource({1, 2, 3}), 1), 
MakeSource({1, 2, 3})});
   auto merged_gen = MakeMergedGenerator(gen, 10);
-  ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
+  // Merged generator can be pulled async-reentrantly and we need to make
+  // sure, if it is, that all futures are marked complete, even if there is an 
error
+  std::vector<Future<TestInt>> futures;
+  for (int i = 0; i < 20; i++) {
+    futures.push_back(merged_gen());
+  }
+  // Items could come in any order so the only guarantee is that we see at 
least
+  // one item before the failure.  After the failure the behavior is undefined
+  // except that we know the futures must complete.
+  bool error_seen = false;
+  for (int i = 0; i < 20; i++) {
+    Future<TestInt> fut = futures[i];
+    ASSERT_TRUE(fut.Wait(arrow::kDefaultAssertFinishesWaitSeconds));
+    Status status = futures[i].status();
+    if (!status.ok()) {
+      ASSERT_GT(i, 0);
+      if (!error_seen) {
+        error_seen = true;
+        ASSERT_TRUE(status.IsInvalid());
+      }
+    }
+  }
+}
+
+TEST_P(MergedGeneratorTestFixture, MergedInnerFailCleanup) {
+  // The purpose of this test is to ensure we do not emit an error until all 
outstanding
+  // futures have completed.  This is part of the AsyncGenerator contract
+  std::shared_ptr<GatingTask> failing_task_gate = GatingTask::Make();
+  std::shared_ptr<GatingTask> passing_task_gate = GatingTask::Make();
+  // A passing inner source emits one item and then waits on a gate and then
+  // emits a terminal item.
+  //
+  // A failing inner source emits one item and then waits on a gate and then
+  // emits an error.
+  auto make_source = [&](bool fails) -> AsyncGenerator<TestInt> {
+    std::shared_ptr<std::atomic<int>> count = 
std::make_shared<std::atomic<int>>(0);
+    if (fails) {
+      return [&, count]() -> Future<TestInt> {
+        int my_count = (*count)++;
+        // std::cout << "Fail:" + std::to_string(my_count) + "\n";
+        if (my_count == 1) {
+          // std::cout << "Waiting on fail gate\n";
+          failing_task_gate->Task()();
+          // std::cout << "returning error from fail\n";
+          return Status::Invalid("XYZ");
+        } else {
+          return SleepABitAsync().Then([] {
+            // std::cout << "returning value from fail\n";
+            return TestInt(0);
+          });
+        }
+      };
+    } else {
+      return [&, count]() -> Future<TestInt> {
+        int my_count = (*count)++;
+        // std::cout << "Pass:" + std::to_string(my_count) + "\n";

Review comment:
       nit: commented code?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1058,18 +1109,73 @@ class MergedGenerator {
       return source();
     }
 
+    void SignalErrorUnlocked() {
+      broken = true;
+      // Empty any results that have arrived but not asked for.
+      while (!delivered_jobs.empty()) {
+        delivered_jobs.pop_front();
+      }
+    }
+
+    void Purge() {
+      while (!waiting_jobs.empty()) {
+        waiting_jobs.front()->MarkFinished(IterationEnd<T>());
+        waiting_jobs.pop_front();
+      }
+    }
+
+    void MarkFinished() {
+      all_finished.MarkFinished();
+      Purge();
+    }
+
+    // This is called outside the mutex but it is only ever called
+    // once and Future<>::AddCallback is thread-safe
+    void MarkFinalError(const Status& err, Future<T> maybe_sink) {
+      if (maybe_sink.is_valid()) {
+        // Someone is waiting for this error so lets mark it complete when
+        // all the work is done
+        // all_finished will get called by something with a strong pointer to 
state
+        // so we can safely capture this

Review comment:
       It doesn't look like we're capturing `this` here, though.

##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -1376,6 +1521,59 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   }
 }
 
+TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
+  // If a failure causes an early end then we should not emit that failure
+  // until all in-flight futures have completed.  This is to prevent tasks from
+  // outliving the generator
+  std::atomic<int32_t> counter(0);
+  auto failure_gating_task = GatingTask::Make();
+  auto in_flight_gating_task = GatingTask::Make();
+  auto source = [&]() -> Future<TestInt> {
+    auto count = counter++;
+    return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
+      if (count == 0) {
+        failure_gating_task->Task()();
+        return Status::Invalid("X");
+      }
+      in_flight_gating_task->Task()();
+      // These are our in-flight tasks
+      return TestInt(0);
+    }));
+  };
+  auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
+  auto should_be_invalid = readahead();
+  ASSERT_OK(in_flight_gating_task->WaitForRunning(10));
+  ASSERT_OK(failure_gating_task->Unlock());
+  SleepABit();
+  // Can't be finished because in-flight tasks are still running
+  AssertNotFinished(should_be_invalid);
+  ASSERT_OK(in_flight_gating_task->Unlock());
+}

Review comment:
       should we validate that `should_be_invalid` is indeed invalid?

##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -553,9 +553,137 @@ TEST_P(MergedGeneratorTestFixture, Merged) {
 
 TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
   auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
-      {MakeSource({1, 2, 3}), MakeFailingSource()});
+      {MakeSource({1, 2, 3}), FailsAt(MakeSource({1, 2, 3}), 1), 
MakeSource({1, 2, 3})});
   auto merged_gen = MakeMergedGenerator(gen, 10);
-  ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
+  // Merged generator can be pulled async-reentrantly and we need to make
+  // sure, if it is, that all futures are marked complete, even if there is an 
error
+  std::vector<Future<TestInt>> futures;
+  for (int i = 0; i < 20; i++) {
+    futures.push_back(merged_gen());
+  }
+  // Items could come in any order so the only guarantee is that we see at 
least
+  // one item before the failure.  After the failure the behavior is undefined
+  // except that we know the futures must complete.
+  bool error_seen = false;
+  for (int i = 0; i < 20; i++) {
+    Future<TestInt> fut = futures[i];
+    ASSERT_TRUE(fut.Wait(arrow::kDefaultAssertFinishesWaitSeconds));
+    Status status = futures[i].status();
+    if (!status.ok()) {
+      ASSERT_GT(i, 0);
+      if (!error_seen) {
+        error_seen = true;
+        ASSERT_TRUE(status.IsInvalid());
+      }
+    }
+  }
+}
+
+TEST_P(MergedGeneratorTestFixture, MergedInnerFailCleanup) {
+  // The purpose of this test is to ensure we do not emit an error until all 
outstanding
+  // futures have completed.  This is part of the AsyncGenerator contract
+  std::shared_ptr<GatingTask> failing_task_gate = GatingTask::Make();
+  std::shared_ptr<GatingTask> passing_task_gate = GatingTask::Make();
+  // A passing inner source emits one item and then waits on a gate and then
+  // emits a terminal item.
+  //
+  // A failing inner source emits one item and then waits on a gate and then
+  // emits an error.
+  auto make_source = [&](bool fails) -> AsyncGenerator<TestInt> {
+    std::shared_ptr<std::atomic<int>> count = 
std::make_shared<std::atomic<int>>(0);
+    if (fails) {

Review comment:
       nit: it seems the branches could be consolidated if `fails` were 
captured by the lambda instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to