lidavidm commented on a change in pull request #12662:
URL: https://github.com/apache/arrow/pull/12662#discussion_r834272703



##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -551,11 +551,131 @@ TEST_P(MergedGeneratorTestFixture, Merged) {
   ASSERT_EQ(expected, concat_set);
 }
 
+TEST_P(MergedGeneratorTestFixture, OuterSubscriptionEmpty) {
+  auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>({});
+  if (IsSlow()) {
+    gen = SlowdownABit(gen);
+  }
+  auto merged_gen = MakeMergedGenerator(gen, 10);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto collected,
+                                CollectAsyncGenerator(std::move(merged_gen)));
+  ASSERT_TRUE(collected.empty());
+}
+
 TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
   auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
-      {MakeSource({1, 2, 3}), MakeFailingSource()});
+      {MakeSource({1, 2, 3}), FailsAt(MakeSource({1, 2, 3}), 1), 
MakeSource({1, 2, 3})});
   auto merged_gen = MakeMergedGenerator(gen, 10);
-  ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
+  // Merged generator can be pulled async-reentrantly and we need to make
+  // sure, if it is, that all futures are marked complete, even if there is an 
error
+  std::vector<Future<TestInt>> futures;
+  for (int i = 0; i < 20; i++) {
+    futures.push_back(merged_gen());
+  }
+  // Items could come in any order so the only guarantee is that we see at 
least
+  // one item before the failure.  After the failure the behavior is undefined
+  // except that we know the futures must complete.
+  bool error_seen = false;
+  for (int i = 0; i < 20; i++) {
+    Future<TestInt> fut = futures[i];
+    ASSERT_TRUE(fut.Wait(arrow::kDefaultAssertFinishesWaitSeconds));
+    Status status = futures[i].status();
+    if (!status.ok()) {
+      ASSERT_GT(i, 0);
+      if (!error_seen) {
+        error_seen = true;
+        ASSERT_TRUE(status.IsInvalid());
+      }
+    }
+  }
+}
+
+TEST_P(MergedGeneratorTestFixture, MergedInnerFailCleanup) {

Review comment:
       It looks like this failed on AppVeyor:
   
   ```
   [ RUN      ] 
MergedGeneratorTests/MergedGeneratorTestFixture.MergedInnerFailCleanup/0
   [       OK ] 
MergedGeneratorTests/MergedGeneratorTestFixture.MergedInnerFailCleanup/0 (3 ms)
   [ RUN      ] 
MergedGeneratorTests/MergedGeneratorTestFixture.MergedInnerFailCleanup/1
   C:/projects/arrow/cpp/src/arrow/util/async_generator_test.cc(642): error: 
Failed
   'passing_task_gate->WaitForRunning(2)' failed with Invalid: Timed out 
waiting for tasks to launch
   [  FAILED  ] 
MergedGeneratorTests/MergedGeneratorTestFixture.MergedInnerFailCleanup/1, where 
GetParam() = true (20004 ms)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to