westonpace commented on a change in pull request #12712:
URL: https://github.com/apache/arrow/pull/12712#discussion_r836964552
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
return FutureToSync(fut);
}
+ template <typename T>
+ static Iterator<T> RunGeneratorInSerialExecutor(
Review comment:
I like that. I've updated the name.
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
return FutureToSync(fut);
}
+ template <typename T>
Review comment:
Added a description
##########
File path: cpp/src/arrow/testing/async_test_util.h
##########
@@ -20,12 +20,37 @@
#include <atomic>
#include <memory>
+#include "arrow/testing/gtest_util.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
namespace arrow {
namespace util {
+template <typename T>
+AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
+ return MakeVectorGenerator(std::move(v));
+}
+
+template <typename T>
+AsyncGenerator<T> FailsAt(AsyncGenerator<T> src, int failing_index) {
Review comment:
Renamed.
##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
::testing::Values(false, true));
+TEST(SerialExecutor, AsyncGenerator) {
+ std::vector<TestInt> values{1, 2, 3, 4, 5};
+ auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+ Iterator<TestInt> iter =
SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+ [&source](Executor* executor) {
+ return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+ return DeferNotOk(executor->Submit([ti] { return ti; }));
+ });
+ });
+ ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+ ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+ // Sometimes a task will generate follow-up tasks. These should be run
+ // before the next task is started
+ bool follow_up_ran = false;
+ bool first = true;
+ Iterator<TestInt> iter =
+ SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor*
executor) {
+ return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+ if (first) {
+ first = false;
+ Future<TestInt> item =
+ DeferNotOk(executor->Submit([] { return TestInt(0); }));
+ RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+ return item;
+ }
+ return DeferNotOk(executor->Submit([] { return
IterationEnd<TestInt>(); }));
+ };
+ });
+ ASSERT_FALSE(follow_up_ran);
+ ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+ ASSERT_FALSE(follow_up_ran);
+ ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+ ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+ // Simulates a situation where a user calls into the async generator, tasks
(e.g. I/O
+ // readahead tasks) are spawned onto the I/O threadpool, the user gets a
result, and
+ // then the I/O readahead tasks are completed while there is no calling
thread in the
+ // async generator to hand the task off to (it should be queued up)
+ bool follow_up_ran = false;
+ bool first = true;
+ Executor* captured_executor;
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]