bkietz commented on a change in pull request #9643: URL: https://github.com/apache/arrow/pull/9643#discussion_r591684737
########## File path: cpp/src/arrow/util/vector.h ########## @@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, Predicate&& predicate) { return values; } +/// \brief Like MapVector, but where the function can fail. +template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, + typename To = typename internal::call_traits::return_type<Fn>::ValueType> +Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) { + std::vector<To> out; + out.reserve(src.size()); + ARROW_RETURN_NOT_OK( + MaybeTransform(src.begin(), src.end(), std::back_inserter(out), map)); + return out; +} + +template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, + typename To = typename internal::call_traits::return_type<Fn>> +std::vector<To> MapVector(Fn map, const std::vector<From>& source) { Review comment: ```suggestion std::vector<To> MapVector(Fn&& map, const std::vector<From>& source) { ``` ########## File path: cpp/src/arrow/util/type_fwd.h ########## @@ -17,14 +17,14 @@ #pragma once +#include <functional> + namespace arrow { namespace detail { struct Empty; -} // namespace detail +} Review comment: code style: revert this please ########## File path: cpp/src/arrow/util/type_fwd.h ########## @@ -17,14 +17,14 @@ #pragma once +#include <functional> + namespace arrow { namespace detail { struct Empty; -} // namespace detail +} -template <typename T = detail::Empty> -class Future; Review comment: Is it worth maintaining arrow/util/type_fwd.h? Maybe it can be inlined into arrow/type_fwd.h. In any case, it can wait for follow up ########## File path: cpp/src/arrow/util/type_fwd.h ########## @@ -17,14 +17,14 @@ #pragma once +#include <functional> Review comment: revert this; any code which depends on this line should directly include `<functional>` instead ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* +The methods in this file create, modify, and utilize AsyncGenerator which is an iterator +of futures. This allows an asynchronous source (like file input) to be run through a +pipeline in the same way that iterators can be used to create pipelined workflows. + +In order to support pipeline parallelism we introduce the concept of asynchronous +reentrancy. This is different than synchronous reentrancy. With synchronous code a +function is reentrant if the function can be called again while a previous call to that +function is still running. Unless otherwise called out none of these generators are +synchronously reentrant. Care should be taken to avoid calling them in such a way (and +the utilities Visit/Collect/Await take care to do this). + +Asynchronous reentrancy on the other hand means the function is called again before the +future returned by the function completes (but after the call to get the future +completes). Some of these generators are async-reentrant while others (e.g. those that Review comment: ```suggestion future returned by the function is marked finished (but after the call to get the future returns). Some of these generators are async-reentrant while others (e.g. those that ``` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -65,6 +100,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, return Loop(LoopBody{std::move(generator), std::move(visitor)}); } +/// \brief Waits for an async generator to complete, discarding results. +template <typename T> +Future<> AwaitAsyncGenerator(AsyncGenerator<T> generator) { + std::function<Status(T)> visitor = [](...) { return Status::OK(); }; + return VisitAsyncGenerator(generator, visitor); +} // namespace arrow Review comment: ```suggestion } ``` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* +The methods in this file create, modify, and utilize AsyncGenerator which is an iterator +of futures. This allows an asynchronous source (like file input) to be run through a +pipeline in the same way that iterators can be used to create pipelined workflows. + +In order to support pipeline parallelism we introduce the concept of asynchronous +reentrancy. This is different than synchronous reentrancy. With synchronous code a +function is reentrant if the function can be called again while a previous call to that +function is still running. Unless otherwise called out none of these generators are Review comment: ```suggestion function is still running. Unless otherwise specified none of these generators are ``` (just because "call" is overloaded here) ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -65,6 +100,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, return Loop(LoopBody{std::move(generator), std::move(visitor)}); } +/// \brief Waits for an async generator to complete, discarding results. +template <typename T> +Future<> AwaitAsyncGenerator(AsyncGenerator<T> generator) { Review comment: ```suggestion Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) { ``` ########## File path: cpp/src/arrow/util/vector.h ########## @@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, Predicate&& predicate) { return values; } +/// \brief Like MapVector, but where the function can fail. +template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, + typename To = typename internal::call_traits::return_type<Fn>::ValueType> +Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) { + std::vector<To> out; + out.reserve(src.size()); + ARROW_RETURN_NOT_OK( + MaybeTransform(src.begin(), src.end(), std::back_inserter(out), map)); + return out; +} + +template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, + typename To = typename internal::call_traits::return_type<Fn>> +std::vector<To> MapVector(Fn map, const std::vector<From>& source) { + std::vector<To> out; + out.reserve(source.size()); + std::transform(source.begin(), source.end(), std::back_inserter(out), map); + return out; +} + +template <typename T> +std::vector<T> FlattenVectors(const std::vector<std::vector<T>> vecs) { Review comment: ```suggestion std::vector<T> FlattenVectors(const std::vector<std::vector<T>>& vecs) { ``` ########## File path: cpp/src/arrow/util/vector.h ########## @@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, Predicate&& predicate) { return values; } +/// \brief Like MapVector, but where the function can fail. +template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, + typename To = typename internal::call_traits::return_type<Fn>::ValueType> +Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) { Review comment: ```suggestion Result<std::vector<To>> MaybeMapVector(Fn&& map, const std::vector<From>& src) { ``` ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) { // -------------------------------------------------------------------- // Asynchronous iterator tests +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { Review comment: There are some cases below where AssertValid is not called (and would be difficult to call). Replacing this with a destructor will ensure that we always assert the checker is valid. ```suggestion ~ReentrantChecker() { ``` This is a bit inflexible, though; you might want to assert invalidity instead or assert sooner than the checker will fall out of scope. Instead, it might be better to move the validity flag outside this helper: ```c++ ReentrantChecker(AsyncGenerator<T> source, std::atomic<bool>* is_valid) : state_(std::make_shared<State>(std::move(source), is_valid)) {} ``` So that the flag's value can be asserted as needed: ```c++ std::atomic<bool> all_reentrant_checkers_valid(true); for (...) { sources[i] = ReentrantChecker<TestInt>(std::move(sources[i]), &all_reentrant_checkers_valid); } //... ASSERT_TRUE(all_reentrant_checkers_valid.load()); ``` ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -805,7 +798,7 @@ class SerialTableReader : public BaseTableReader { std::move(first_buffer)); while (true) { ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next()); - if (maybe_block == IterationTraits<CSVBlock>::End()) { + if (IterationTraits<CSVBlock>::IsEnd(maybe_block)) { Review comment: I agree that this is more ergonomic. Additionally, please make it clear which is the intended single point of truth for detecting the end sentinel (::End() or ::IsEnd()) ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* Review comment: Style nit: please use `//` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -235,29 +541,209 @@ class ReadaheadGenerator { /// The source generator must be async-reentrant /// /// This generator itself is async-reentrant. +/// +/// This generator may queue up to max_readahead instances of T template <typename T> AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) { return ReadaheadGenerator<T>(std::move(source_generator), max_readahead); } -/// \brief Transforms an async generator using a transformer function returning a new -/// AsyncGenerator +/// \brief Creates a generator that will yield finished futures from a vector /// -/// The transform function here behaves exactly the same as the transform function in -/// MakeTransformedIterator and you can safely use the same transform function to -/// transform both synchronous and asynchronous streams. +/// This generator is async-reentrant +template <typename T> +AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) { + struct State { + explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {} + + std::vector<T> vec; + std::atomic<std::size_t> vec_idx; + }; + + auto state = std::make_shared<State>(std::move(vec)); + return [state]() { + auto idx = state->vec_idx.fetch_add(1); + if (idx >= state->vec.size()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + return Future<T>::MakeFinished(state->vec[idx]); + }; +} + +/// \see MakeMergeMapGenerator +template <typename T> +class MergeMapGenerator { + public: + explicit MergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> source, + int max_subscriptions) + : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {} + + Future<T> operator()() { + Future<T> waiting_future; + std::shared_ptr<DeliveredJob> delivered_job; + { + auto guard = state_->mutex.Lock(); + if (!state_->delivered_jobs.empty()) { + delivered_job = std::move(state_->delivered_jobs.front()); + state_->delivered_jobs.pop_front(); + } else if (state_->finished) { + return IterationTraits<T>::End(); + } else { + waiting_future = Future<T>::Make(); + state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future)); + } + } + if (delivered_job) { + delivered_job->deliverer().AddCallback(InnerCallback{state_, delivered_job->index}); + return std::move(delivered_job->value); + } + if (state_->first) { + state_->first = false; + for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) { + state_->source().AddCallback(OuterCallback{state_, i}); + } + } + return waiting_future; + } + + private: + struct DeliveredJob { + explicit DeliveredJob(AsyncGenerator<T> deliverer_, T value_, std::size_t index_) + : deliverer(deliverer_), value(value_), index(index_) {} + + AsyncGenerator<T> deliverer; + T value; + std::size_t index; + }; + + struct State { + State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) + : source(std::move(source)), + active_subscriptions(max_subscriptions), + delivered_jobs(), + waiting_jobs(), + mutex(), + first(true), + source_exhausted(false), + finished(false), + num_active_subscriptions(max_subscriptions) {} + + AsyncGenerator<AsyncGenerator<T>> source; + // active_subscriptions and delivered_jobs will be bounded by max_subscriptions + std::vector<AsyncGenerator<T>> active_subscriptions; + std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs; + // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the + // backpressure + std::deque<std::shared_ptr<Future<T>>> waiting_jobs; + util::Mutex mutex; + bool first; + bool source_exhausted; + bool finished; + int num_active_subscriptions; + }; + + struct InnerCallback { + void operator()(const Result<T>& maybe_next) { + bool finished = false; + Future<T> sink; + if (maybe_next.ok()) { + finished = IterationTraits<T>::IsEnd(*maybe_next); + { + auto guard = state->mutex.Lock(); + if (!finished) { + if (state->waiting_jobs.empty()) { + state->delivered_jobs.push_back(std::make_shared<DeliveredJob>( + state->active_subscriptions[index], *maybe_next, index)); + } else { + sink = std::move(*state->waiting_jobs.front()); + state->waiting_jobs.pop_front(); + } + } + } + } else { + finished = true; + } + if (finished) { + state->source().AddCallback(OuterCallback{state, index}); + } else if (sink.is_valid()) { + sink.MarkFinished(*maybe_next); + state->active_subscriptions[index]().AddCallback(*this); + } + } + std::shared_ptr<State> state; + std::size_t index; + }; + + struct OuterCallback { + void operator()(const Result<AsyncGenerator<T>>& maybe_next) { + bool should_purge = false; + bool should_continue = false; + { + auto guard = state->mutex.Lock(); + if (!maybe_next.ok() || IterationTraits<AsyncGenerator<T>>::IsEnd(*maybe_next)) { + state->source_exhausted = true; + if (--state->num_active_subscriptions == 0) { + state->finished = true; + should_purge = true; + } + } else { + state->active_subscriptions[index] = *maybe_next; + should_continue = true; + } + } + if (should_continue) { + (*maybe_next)().AddCallback(InnerCallback{state, index}); + } else if (should_purge) { + // At this point state->finished has been marked true so no one else + // will be interacting with waiting_jobs and we can iterate outside lock + while (!state->waiting_jobs.empty()) { + state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End()); + state->waiting_jobs.pop_front(); + } + } + } + std::shared_ptr<State> state; + std::size_t index; + }; + + std::shared_ptr<State> state_; +}; + +/// \brief Creates a generator that takes in a stream of generators and pulls from up to +/// max_subscriptions at a time /// -/// This generator is not async-reentrant -template <typename T, typename V> -AsyncGenerator<V> MakeAsyncGenerator(AsyncGenerator<T> generator, - Transformer<T, V> transformer) { - return TransformingGenerator<T, V>(generator, transformer); +/// Note: This is the equivalent of Rx::MergeMap. This may deliver items out of Review comment: I see the connection, but I'm not sure this is precisely analagous since there isn't a `map` function here. Readers unfamiliar with Rx::MergeMap might misunderstand. Could we just name this `MergedGenerator` or so? ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) { // -------------------------------------------------------------------- // Asynchronous iterator tests +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; Review comment: Nit: more descriptive nam ```suggestion std::atomic<bool> generated_unfinished_future; ``` ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -589,15 +675,255 @@ TEST(TestAsyncUtil, Collect) { ASSERT_EQ(expected, collected_val); } +TEST(TestAsyncUtil, Map) { + std::vector<TestInt> input = {1, 2, 3}; + auto generator = AsyncVectorIt(input); + std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) { + return std::to_string(in.value); + }; + auto mapped = MakeMappedGenerator(std::move(generator), mapper); + std::vector<TestStr> expected{"1", "2", "3"}; + AssertAsyncGeneratorMatch(expected, mapped); +} + +TEST(TestAsyncUtil, MapAsync) { + std::vector<TestInt> input = {1, 2, 3}; + auto generator = AsyncVectorIt(input); + std::function<Future<TestStr>(const TestInt&)> mapper = [](const TestInt& in) { + return SleepAsync(1e-3).Then([in](const Result<detail::Empty>& empty) { + return TestStr(std::to_string(in.value)); + }); + }; + auto mapped = MakeMappedGenerator(std::move(generator), mapper); + std::vector<TestStr> expected{"1", "2", "3"}; + AssertAsyncGeneratorMatch(expected, mapped); +} + +TEST(TestAsyncUtil, MapReentrant) { + std::vector<TestInt> input = {1, 2}; + auto source = AsyncVectorIt(input); + TrackingGenerator<TestInt> tracker(std::move(source)); + source = MakeTransferredGenerator(AsyncGenerator<TestInt>(tracker), + internal::GetCpuThreadPool()); + + std::atomic<int> map_tasks_running(0); + // Mapper blocks until signal, should start multiple map tasks + std::atomic<bool> can_proceed(false); + std::function<Future<TestStr>(const TestInt&)> mapper = + [&can_proceed, &map_tasks_running](const TestInt& in) -> Future<TestStr> { + auto fut = Future<TestStr>::Make(); + map_tasks_running.fetch_add(1); + std::thread([fut, in, &can_proceed]() mutable { + while (!can_proceed.load()) { + SleepABit(); + } + fut.MarkFinished(TestStr(std::to_string(in.value))); + }).detach(); + return fut; Review comment: Instead, could we use ```suggestion Future<> can_proceed; std::function<Future<TestStr>(const TestInt&)> mapper = [&](const TestInt& in) { map_tasks_running.fetch_add(1); return can_proceed.Then([in](...) { return TestStr(std::to_string(in.value)); }); ``` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org