bkietz commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r576262763
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -512,7 +545,7 @@ class ARROW_MUST_USE_TYPE Future {
FRIEND_TEST(FutureRefTest, ChainRemoved);
FRIEND_TEST(FutureRefTest, TailRemoved);
FRIEND_TEST(FutureRefTest, HeadRemoved);
-};
+}; // namespace arrow
Review comment:
```suggestion
};
```
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -581,4 +646,82 @@ inline std::vector<int> WaitForAny(const
std::vector<Future<T>*>& futures,
return waiter->MoveFinishedFutures();
}
+struct Continue {
+ template <typename T>
+ operator util::optional<T>() && { // NOLINT explicit
+ return {};
+ }
+};
+
+template <typename T = detail::Empty>
+util::optional<T> Break(T break_value = {}) {
+ return util::optional<T>{std::move(break_value)};
+}
+
+template <typename T = detail::Empty>
+using ControlFlow = util::optional<T>;
+
+/// \brief Loop through an asynchronous sequence
+///
+/// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On
completion of
+/// each yielded future the resulting ControlFlow will be examined. A Break
will terminate
+/// the loop, while a Continue will re-invoke `iterate`. \return A future
which will
+/// complete when a Future returned by iterate completes with a Break
+template <typename Iterate,
+ typename Control = typename
detail::result_of_t<Iterate()>::ValueType,
+ typename BreakValueType = typename Control::value_type>
+Future<BreakValueType> Loop(Iterate iterate) {
+ auto break_fut = Future<BreakValueType>::Make();
+
+ struct Callback {
+ bool CheckForTermination(const Result<Control>& control_res) {
+ if (!control_res.ok()) {
+ break_fut.MarkFinished(control_res.status());
+ return true;
+ }
+ if (control_res->has_value()) {
+ break_fut.MarkFinished(*std::move(*control_res));
+ return true;
+ }
+ return false;
+ }
+
+ void operator()(const Result<Control>& maybe_control) && {
+ if (CheckForTermination(maybe_control)) return;
+
+ auto control_fut = iterate();
+ while (true) {
+ if (control_fut.is_finished()) {
+ // There's no need to AddCallback on a finished future; we can
+ // CheckForTermination now. This also avoids recursion and potential
stack
+ // overflow.
+ if (CheckForTermination(control_fut.result())) return;
+
+ control_fut = iterate();
+ } else {
+ std::function<Callback()> callback_factory = [this]() { return
*this; };
+ if (control_fut.TryAddCallback(callback_factory)) {
+ break;
+ }
+ // Else we tried to add a callback but someone had stolen in and
marked the
+ // future finished so we can just resume iteration
+ }
Review comment:
Nit: this loop could be simplified a bit further:
https://github.com/westonpace/arrow/pull/4
##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -792,65 +855,98 @@ class ThreadedTableReader : public BaseTableReader {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_,
read_options_.block_size));
- int32_t block_queue_size = thread_pool_->GetCapacity();
- ARROW_ASSIGN_OR_RAISE(auto rh_it,
- MakeReadaheadIterator(std::move(istream_it),
block_queue_size));
- buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
+ // TODO: use io_executor_ here, see ARROW-11590
+ ARROW_ASSIGN_OR_RAISE(auto background_executor,
internal::ThreadPool::Make(1));
+ ARROW_ASSIGN_OR_RAISE(auto bg_it,
MakeBackgroundGenerator(std::move(istream_it),
+
background_executor.get()));
+ AsyncGenerator<std::shared_ptr<Buffer>> wrapped_bg_it =
+ [bg_it, background_executor]() { return bg_it(); };
+
+ auto transferred_it =
+ MakeTransferredGenerator(std::move(wrapped_bg_it), cpu_executor_);
+
+ int32_t block_queue_size = cpu_executor_->GetCapacity();
+ auto rh_it = MakeReadaheadGenerator(std::move(transferred_it),
block_queue_size);
+ buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
return Status::OK();
}
- Result<std::shared_ptr<Table>> Read() override {
- task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_);
+ Result<std::shared_ptr<Table>> Read() override { return
ReadAsync().result(); }
+
+ Future<std::shared_ptr<Table>> ReadAsync() override {
+ task_group_ = internal::TaskGroup::MakeThreaded(cpu_executor_);
+
+ auto self = shared_from_this();
+ return ProcessFirstBuffer().Then([self](const std::shared_ptr<Buffer>
first_buffer) {
Review comment:
```suggestion
return ProcessFirstBuffer().Then([self](std::shared_ptr<Buffer>
first_buffer) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]