bkietz commented on a change in pull request #9643:
URL: https://github.com/apache/arrow/pull/9643#discussion_r591684737



##########
File path: cpp/src/arrow/util/vector.h
##########
@@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, 
Predicate&& predicate) {
   return values;
 }
 
+/// \brief Like MapVector, but where the function can fail.
+template <typename Fn, typename From = internal::call_traits::argument_type<0, 
Fn>,
+          typename To = typename 
internal::call_traits::return_type<Fn>::ValueType>
+Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) {
+  std::vector<To> out;
+  out.reserve(src.size());
+  ARROW_RETURN_NOT_OK(
+      MaybeTransform(src.begin(), src.end(), std::back_inserter(out), map));
+  return out;
+}
+
+template <typename Fn, typename From = internal::call_traits::argument_type<0, 
Fn>,
+          typename To = typename internal::call_traits::return_type<Fn>>
+std::vector<To> MapVector(Fn map, const std::vector<From>& source) {

Review comment:
       ```suggestion
   std::vector<To> MapVector(Fn&& map, const std::vector<From>& source) {
   ```

##########
File path: cpp/src/arrow/util/type_fwd.h
##########
@@ -17,14 +17,14 @@
 
 #pragma once
 
+#include <functional>
+
 namespace arrow {
 
 namespace detail {
 struct Empty;
-}  // namespace detail
+}

Review comment:
       code style: revert this please

##########
File path: cpp/src/arrow/util/type_fwd.h
##########
@@ -17,14 +17,14 @@
 
 #pragma once
 
+#include <functional>
+
 namespace arrow {
 
 namespace detail {
 struct Empty;
-}  // namespace detail
+}
 
-template <typename T = detail::Empty>
-class Future;

Review comment:
       Is it worth maintaining arrow/util/type_fwd.h? Maybe it can be inlined 
into arrow/type_fwd.h. In any case, it can wait for follow up

##########
File path: cpp/src/arrow/util/type_fwd.h
##########
@@ -17,14 +17,14 @@
 
 #pragma once
 
+#include <functional>

Review comment:
       revert this; any code which depends on this line should directly include 
`<functional>` instead

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
 #include "arrow/util/functional.h"
 #include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
 #include "arrow/util/thread_pool.h"
 
 namespace arrow {
 
+/*
+The methods in this file create, modify, and utilize AsyncGenerator which is 
an iterator
+of futures.  This allows an asynchronous source (like file input) to be run 
through a
+pipeline in the same way that iterators can be used to create pipelined 
workflows.
+
+In order to support pipeline parallelism we introduce the concept of 
asynchronous
+reentrancy. This is different than synchronous reentrancy.  With synchronous 
code a
+function is reentrant if the function can be called again while a previous 
call to that
+function is still running.  Unless otherwise called out none of these 
generators are
+synchronously reentrant.  Care should be taken to avoid calling them in such a 
way (and
+the utilities Visit/Collect/Await take care to do this).
+
+Asynchronous reentrancy on the other hand means the function is called again 
before the
+future returned by the function completes (but after the call to get the future
+completes).  Some of these generators are async-reentrant while others (e.g. 
those that

Review comment:
       ```suggestion
   future returned by the function is marked finished (but after the call to 
get the future
   returns).  Some of these generators are async-reentrant while others (e.g. 
those that
   ```

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -65,6 +100,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
   return Loop(LoopBody{std::move(generator), std::move(visitor)});
 }
 
+/// \brief Waits for an async generator to complete, discarding results.
+template <typename T>
+Future<> AwaitAsyncGenerator(AsyncGenerator<T> generator) {
+  std::function<Status(T)> visitor = [](...) { return Status::OK(); };
+  return VisitAsyncGenerator(generator, visitor);
+}  // namespace arrow

Review comment:
       ```suggestion
   }
   ```

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
 #include "arrow/util/functional.h"
 #include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
 #include "arrow/util/thread_pool.h"
 
 namespace arrow {
 
+/*
+The methods in this file create, modify, and utilize AsyncGenerator which is 
an iterator
+of futures.  This allows an asynchronous source (like file input) to be run 
through a
+pipeline in the same way that iterators can be used to create pipelined 
workflows.
+
+In order to support pipeline parallelism we introduce the concept of 
asynchronous
+reentrancy. This is different than synchronous reentrancy.  With synchronous 
code a
+function is reentrant if the function can be called again while a previous 
call to that
+function is still running.  Unless otherwise called out none of these 
generators are

Review comment:
       ```suggestion
   function is still running.  Unless otherwise specified none of these 
generators are
   ```
   (just because "call" is overloaded here)

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -65,6 +100,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
   return Loop(LoopBody{std::move(generator), std::move(visitor)});
 }
 
+/// \brief Waits for an async generator to complete, discarding results.
+template <typename T>
+Future<> AwaitAsyncGenerator(AsyncGenerator<T> generator) {

Review comment:
       ```suggestion
   Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
   ```

##########
File path: cpp/src/arrow/util/vector.h
##########
@@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, 
Predicate&& predicate) {
   return values;
 }
 
+/// \brief Like MapVector, but where the function can fail.
+template <typename Fn, typename From = internal::call_traits::argument_type<0, 
Fn>,
+          typename To = typename 
internal::call_traits::return_type<Fn>::ValueType>
+Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) {
+  std::vector<To> out;
+  out.reserve(src.size());
+  ARROW_RETURN_NOT_OK(
+      MaybeTransform(src.begin(), src.end(), std::back_inserter(out), map));
+  return out;
+}
+
+template <typename Fn, typename From = internal::call_traits::argument_type<0, 
Fn>,
+          typename To = typename internal::call_traits::return_type<Fn>>
+std::vector<To> MapVector(Fn map, const std::vector<From>& source) {
+  std::vector<To> out;
+  out.reserve(source.size());
+  std::transform(source.begin(), source.end(), std::back_inserter(out), map);
+  return out;
+}
+
+template <typename T>
+std::vector<T> FlattenVectors(const std::vector<std::vector<T>> vecs) {

Review comment:
       ```suggestion
   std::vector<T> FlattenVectors(const std::vector<std::vector<T>>& vecs) {
   ```

##########
File path: cpp/src/arrow/util/vector.h
##########
@@ -81,5 +84,53 @@ std::vector<T> FilterVector(std::vector<T> values, 
Predicate&& predicate) {
   return values;
 }
 
+/// \brief Like MapVector, but where the function can fail.
+template <typename Fn, typename From = internal::call_traits::argument_type<0, 
Fn>,
+          typename To = typename 
internal::call_traits::return_type<Fn>::ValueType>
+Result<std::vector<To>> MaybeMapVector(Fn map, const std::vector<From>& src) {

Review comment:
       ```suggestion
   Result<std::vector<To>> MaybeMapVector(Fn&& map, const std::vector<From>& 
src) {
   ```

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) {
 // --------------------------------------------------------------------
 // Asynchronous iterator tests
 
+template <typename T>
+class ReentrantChecker {
+ public:
+  explicit ReentrantChecker(AsyncGenerator<T> source)
+      : state_(std::make_shared<State>(std::move(source))) {}
+
+  Future<T> operator()() {
+    if (state_->in.load()) {
+      state_->valid.store(false);
+    }
+    state_->in.store(true);
+    auto result = state_->source();
+    return result.Then(Callback{state_});
+  }
+
+  void AssertValid() {

Review comment:
       There are some cases below where AssertValid is not called (and would be 
difficult to call). Replacing this with a destructor will ensure that we always 
assert the checker is valid.
   ```suggestion
     ~ReentrantChecker() {
   ```
   
   This is a bit inflexible, though; you might want to assert invalidity 
instead or assert sooner than the checker will fall out of scope. Instead, it 
might be better to move the validity flag outside this helper:
   
   ```c++
     ReentrantChecker(AsyncGenerator<T> source, std::atomic<bool>* is_valid)
         : state_(std::make_shared<State>(std::move(source), is_valid)) {}
   ```
   
   So that the flag's value can be asserted as needed:
   
   ```c++
   std::atomic<bool> all_reentrant_checkers_valid(true);
   for (...) {
     sources[i] = ReentrantChecker<TestInt>(std::move(sources[i]), 
&all_reentrant_checkers_valid);
   }
   //...
   ASSERT_TRUE(all_reentrant_checkers_valid.load());
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -805,7 +798,7 @@ class SerialTableReader : public BaseTableReader {
                                                           
std::move(first_buffer));
     while (true) {
       ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next());
-      if (maybe_block == IterationTraits<CSVBlock>::End()) {
+      if (IterationTraits<CSVBlock>::IsEnd(maybe_block)) {

Review comment:
       I agree that this is more ergonomic. Additionally, please make it clear 
which is the intended single point of truth for detecting the end sentinel 
(::End() or ::IsEnd())

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
 #include "arrow/util/functional.h"
 #include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
 #include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
 #include "arrow/util/thread_pool.h"
 
 namespace arrow {
 
+/*

Review comment:
       Style nit: please use `//`

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -235,29 +541,209 @@ class ReadaheadGenerator {
 /// The source generator must be async-reentrant
 ///
 /// This generator itself is async-reentrant.
+///
+/// This generator may queue up to max_readahead instances of T
 template <typename T>
 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
                                          int max_readahead) {
   return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
 }
 
-/// \brief Transforms an async generator using a transformer function 
returning a new
-/// AsyncGenerator
+/// \brief Creates a generator that will yield finished futures from a vector
 ///
-/// The transform function here behaves exactly the same as the transform 
function in
-/// MakeTransformedIterator and you can safely use the same transform function 
to
-/// transform both synchronous and asynchronous streams.
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
+  struct State {
+    explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
+
+    std::vector<T> vec;
+    std::atomic<std::size_t> vec_idx;
+  };
+
+  auto state = std::make_shared<State>(std::move(vec));
+  return [state]() {
+    auto idx = state->vec_idx.fetch_add(1);
+    if (idx >= state->vec.size()) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    return Future<T>::MakeFinished(state->vec[idx]);
+  };
+}
+
+/// \see MakeMergeMapGenerator
+template <typename T>
+class MergeMapGenerator {
+ public:
+  explicit MergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                             int max_subscriptions)
+      : state_(std::make_shared<State>(std::move(source), max_subscriptions)) 
{}
+
+  Future<T> operator()() {
+    Future<T> waiting_future;
+    std::shared_ptr<DeliveredJob> delivered_job;
+    {
+      auto guard = state_->mutex.Lock();
+      if (!state_->delivered_jobs.empty()) {
+        delivered_job = std::move(state_->delivered_jobs.front());
+        state_->delivered_jobs.pop_front();
+      } else if (state_->finished) {
+        return IterationTraits<T>::End();
+      } else {
+        waiting_future = Future<T>::Make();
+        
state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
+      }
+    }
+    if (delivered_job) {
+      delivered_job->deliverer().AddCallback(InnerCallback{state_, 
delivered_job->index});
+      return std::move(delivered_job->value);
+    }
+    if (state_->first) {
+      state_->first = false;
+      for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
+        state_->source().AddCallback(OuterCallback{state_, i});
+      }
+    }
+    return waiting_future;
+  }
+
+ private:
+  struct DeliveredJob {
+    explicit DeliveredJob(AsyncGenerator<T> deliverer_, T value_, std::size_t 
index_)
+        : deliverer(deliverer_), value(value_), index(index_) {}
+
+    AsyncGenerator<T> deliverer;
+    T value;
+    std::size_t index;
+  };
+
+  struct State {
+    State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
+        : source(std::move(source)),
+          active_subscriptions(max_subscriptions),
+          delivered_jobs(),
+          waiting_jobs(),
+          mutex(),
+          first(true),
+          source_exhausted(false),
+          finished(false),
+          num_active_subscriptions(max_subscriptions) {}
+
+    AsyncGenerator<AsyncGenerator<T>> source;
+    // active_subscriptions and delivered_jobs will be bounded by 
max_subscriptions
+    std::vector<AsyncGenerator<T>> active_subscriptions;
+    std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
+    // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will 
provide the
+    // backpressure
+    std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
+    util::Mutex mutex;
+    bool first;
+    bool source_exhausted;
+    bool finished;
+    int num_active_subscriptions;
+  };
+
+  struct InnerCallback {
+    void operator()(const Result<T>& maybe_next) {
+      bool finished = false;
+      Future<T> sink;
+      if (maybe_next.ok()) {
+        finished = IterationTraits<T>::IsEnd(*maybe_next);
+        {
+          auto guard = state->mutex.Lock();
+          if (!finished) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  state->active_subscriptions[index], *maybe_next, index));
+            } else {
+              sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        }
+      } else {
+        finished = true;
+      }
+      if (finished) {
+        state->source().AddCallback(OuterCallback{state, index});
+      } else if (sink.is_valid()) {
+        sink.MarkFinished(*maybe_next);
+        state->active_subscriptions[index]().AddCallback(*this);
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  struct OuterCallback {
+    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
+      bool should_purge = false;
+      bool should_continue = false;
+      {
+        auto guard = state->mutex.Lock();
+        if (!maybe_next.ok() || 
IterationTraits<AsyncGenerator<T>>::IsEnd(*maybe_next)) {
+          state->source_exhausted = true;
+          if (--state->num_active_subscriptions == 0) {
+            state->finished = true;
+            should_purge = true;
+          }
+        } else {
+          state->active_subscriptions[index] = *maybe_next;
+          should_continue = true;
+        }
+      }
+      if (should_continue) {
+        (*maybe_next)().AddCallback(InnerCallback{state, index});
+      } else if (should_purge) {
+        // At this point state->finished has been marked true so no one else
+        // will be interacting with waiting_jobs and we can iterate outside 
lock
+        while (!state->waiting_jobs.empty()) {
+          state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
+          state->waiting_jobs.pop_front();
+        }
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that takes in a stream of generators and pulls 
from up to
+/// max_subscriptions at a time
 ///
-/// This generator is not async-reentrant
-template <typename T, typename V>
-AsyncGenerator<V> MakeAsyncGenerator(AsyncGenerator<T> generator,
-                                     Transformer<T, V> transformer) {
-  return TransformingGenerator<T, V>(generator, transformer);
+/// Note: This is the equivalent of Rx::MergeMap.  This may deliver items out 
of

Review comment:
       I see the connection, but I'm not sure this is precisely analagous since 
there isn't a `map` function here. Readers unfamiliar with Rx::MergeMap might 
misunderstand. Could we just name this `MergedGenerator` or so?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) {
 // --------------------------------------------------------------------
 // Asynchronous iterator tests
 
+template <typename T>
+class ReentrantChecker {
+ public:
+  explicit ReentrantChecker(AsyncGenerator<T> source)
+      : state_(std::make_shared<State>(std::move(source))) {}
+
+  Future<T> operator()() {
+    if (state_->in.load()) {
+      state_->valid.store(false);
+    }
+    state_->in.store(true);
+    auto result = state_->source();
+    return result.Then(Callback{state_});
+  }
+
+  void AssertValid() {
+    EXPECT_EQ(true, state_->valid.load())
+        << "The generator was accessed in a reentrant manner";
+  }
+
+ private:
+  struct State {
+    explicit State(AsyncGenerator<T> source_)
+        : source(std::move(source_)), in(false), valid(true) {}
+
+    AsyncGenerator<T> source;
+    std::atomic<bool> in;

Review comment:
       Nit: more descriptive nam
   ```suggestion
       std::atomic<bool> generated_unfinished_future;
   ```

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -589,15 +675,255 @@ TEST(TestAsyncUtil, Collect) {
   ASSERT_EQ(expected, collected_val);
 }
 
+TEST(TestAsyncUtil, Map) {
+  std::vector<TestInt> input = {1, 2, 3};
+  auto generator = AsyncVectorIt(input);
+  std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) {
+    return std::to_string(in.value);
+  };
+  auto mapped = MakeMappedGenerator(std::move(generator), mapper);
+  std::vector<TestStr> expected{"1", "2", "3"};
+  AssertAsyncGeneratorMatch(expected, mapped);
+}
+
+TEST(TestAsyncUtil, MapAsync) {
+  std::vector<TestInt> input = {1, 2, 3};
+  auto generator = AsyncVectorIt(input);
+  std::function<Future<TestStr>(const TestInt&)> mapper = [](const TestInt& 
in) {
+    return SleepAsync(1e-3).Then([in](const Result<detail::Empty>& empty) {
+      return TestStr(std::to_string(in.value));
+    });
+  };
+  auto mapped = MakeMappedGenerator(std::move(generator), mapper);
+  std::vector<TestStr> expected{"1", "2", "3"};
+  AssertAsyncGeneratorMatch(expected, mapped);
+}
+
+TEST(TestAsyncUtil, MapReentrant) {
+  std::vector<TestInt> input = {1, 2};
+  auto source = AsyncVectorIt(input);
+  TrackingGenerator<TestInt> tracker(std::move(source));
+  source = MakeTransferredGenerator(AsyncGenerator<TestInt>(tracker),
+                                    internal::GetCpuThreadPool());
+
+  std::atomic<int> map_tasks_running(0);
+  // Mapper blocks until signal, should start multiple map tasks
+  std::atomic<bool> can_proceed(false);
+  std::function<Future<TestStr>(const TestInt&)> mapper =
+      [&can_proceed, &map_tasks_running](const TestInt& in) -> Future<TestStr> 
{
+    auto fut = Future<TestStr>::Make();
+    map_tasks_running.fetch_add(1);
+    std::thread([fut, in, &can_proceed]() mutable {
+      while (!can_proceed.load()) {
+        SleepABit();
+      }
+      fut.MarkFinished(TestStr(std::to_string(in.value)));
+    }).detach();
+    return fut;

Review comment:
       Instead, could we use
   ```suggestion
     Future<> can_proceed;
     std::function<Future<TestStr>(const TestInt&)> mapper = [&](const TestInt& 
in) {
       map_tasks_running.fetch_add(1);
       return can_proceed.Then([in](...) {
         return TestStr(std::to_string(in.value));
       });
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to