westonpace commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r832733327
##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -553,9 +553,137 @@ TEST_P(MergedGeneratorTestFixture, Merged) {
TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
- {MakeSource({1, 2, 3}), MakeFailingSource()});
+ {MakeSource({1, 2, 3}), FailsAt(MakeSource({1, 2, 3}), 1),
MakeSource({1, 2, 3})});
auto merged_gen = MakeMergedGenerator(gen, 10);
- ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
+ // Merged generator can be pulled async-reentrantly and we need to make
+ // sure, if it is, that all futures are marked complete, even if there is an
error
+ std::vector<Future<TestInt>> futures;
+ for (int i = 0; i < 20; i++) {
+ futures.push_back(merged_gen());
+ }
+ // Items could come in any order so the only guarantee is that we see at
least
+ // one item before the failure. After the failure the behavior is undefined
+ // except that we know the futures must complete.
+ bool error_seen = false;
+ for (int i = 0; i < 20; i++) {
+ Future<TestInt> fut = futures[i];
+ ASSERT_TRUE(fut.Wait(arrow::kDefaultAssertFinishesWaitSeconds));
+ Status status = futures[i].status();
+ if (!status.ok()) {
+ ASSERT_GT(i, 0);
+ if (!error_seen) {
+ error_seen = true;
+ ASSERT_TRUE(status.IsInvalid());
+ }
+ }
+ }
+}
+
+TEST_P(MergedGeneratorTestFixture, MergedInnerFailCleanup) {
+ // The purpose of this test is to ensure we do not emit an error until all
outstanding
+ // futures have completed. This is part of the AsyncGenerator contract
+ std::shared_ptr<GatingTask> failing_task_gate = GatingTask::Make();
+ std::shared_ptr<GatingTask> passing_task_gate = GatingTask::Make();
+ // A passing inner source emits one item and then waits on a gate and then
+ // emits a terminal item.
+ //
+ // A failing inner source emits one item and then waits on a gate and then
+ // emits an error.
+ auto make_source = [&](bool fails) -> AsyncGenerator<TestInt> {
+ std::shared_ptr<std::atomic<int>> count =
std::make_shared<std::atomic<int>>(0);
+ if (fails) {
+ return [&, count]() -> Future<TestInt> {
+ int my_count = (*count)++;
+ // std::cout << "Fail:" + std::to_string(my_count) + "\n";
+ if (my_count == 1) {
+ // std::cout << "Waiting on fail gate\n";
+ failing_task_gate->Task()();
+ // std::cout << "returning error from fail\n";
+ return Status::Invalid("XYZ");
+ } else {
+ return SleepABitAsync().Then([] {
+ // std::cout << "returning value from fail\n";
+ return TestInt(0);
+ });
+ }
+ };
+ } else {
+ return [&, count]() -> Future<TestInt> {
+ int my_count = (*count)++;
+ // std::cout << "Pass:" + std::to_string(my_count) + "\n";
Review comment:
Thanks. Can't go releasing all my debugging secrets.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]