westonpace commented on a change in pull request #12712:
URL: https://github.com/apache/arrow/pull/12712#discussion_r836965328
##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
::testing::Values(false, true));
+TEST(SerialExecutor, AsyncGenerator) {
+ std::vector<TestInt> values{1, 2, 3, 4, 5};
+ auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+ Iterator<TestInt> iter =
SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+ [&source](Executor* executor) {
+ return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+ return DeferNotOk(executor->Submit([ti] { return ti; }));
+ });
+ });
+ ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+ ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+ // Sometimes a task will generate follow-up tasks. These should be run
+ // before the next task is started
+ bool follow_up_ran = false;
+ bool first = true;
+ Iterator<TestInt> iter =
+ SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor*
executor) {
+ return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+ if (first) {
+ first = false;
+ Future<TestInt> item =
+ DeferNotOk(executor->Submit([] { return TestInt(0); }));
+ RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+ return item;
+ }
+ return DeferNotOk(executor->Submit([] { return
IterationEnd<TestInt>(); }));
+ };
+ });
+ ASSERT_FALSE(follow_up_ran);
+ ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+ ASSERT_FALSE(follow_up_ran);
+ ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+ ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+ // Simulates a situation where a user calls into the async generator, tasks
(e.g. I/O
+ // readahead tasks) are spawned onto the I/O threadpool, the user gets a
result, and
+ // then the I/O readahead tasks are completed while there is no calling
thread in the
+ // async generator to hand the task off to (it should be queued up)
+ bool follow_up_ran = false;
+ bool first = true;
+ Executor* captured_executor;
+ Iterator<TestInt> iter =
+ SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor*
executor) {
+ return [=, &first, &captured_executor]() -> Future<TestInt> {
+ if (first) {
+ captured_executor = executor;
+ first = false;
+ return DeferNotOk(executor->Submit([] {
+ // I/O tasks would be scheduled at this point
+ return TestInt(0);
+ }));
+ }
+ return DeferNotOk(executor->Submit([] { return
IterationEnd<TestInt>(); }));
+ };
+ });
+ ASSERT_FALSE(follow_up_ran);
+ ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+ // I/O task completes and has reference to executor to submit continuation
+ ASSERT_OK(captured_executor->Spawn([&] { follow_up_ran = true; }));
+ // Follow-up task can't run right now because there is no thread in the
executor
+ SleepABit();
+ ASSERT_FALSE(follow_up_ran);
+ // Follow-up should run as part of retrieving the next item
+ ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+ ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithCleanup) {
+ // Sometimes a final task might generate follow-up tasks. Unlike other
follow-up
Review comment:
I see your point. The wording is very ambiguous. Usually "remaining
tasks" get run as part of the following call to `Next`. Since the last item
doesn't have a final call to `Next` we have to run those tasks before we return
the final item. I've updated the comment to just point out what is being
tested and avoid making a comparison.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]