westonpace commented on a change in pull request #9095: URL: https://github.com/apache/arrow/pull/9095#discussion_r576334652
########## File path: cpp/src/arrow/util/future.h ########## @@ -581,4 +646,82 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures, return waiter->MoveFinishedFutures(); } +struct Continue { + template <typename T> + operator util::optional<T>() && { // NOLINT explicit + return {}; + } +}; + +template <typename T = detail::Empty> +util::optional<T> Break(T break_value = {}) { + return util::optional<T>{std::move(break_value)}; +} + +template <typename T = detail::Empty> +using ControlFlow = util::optional<T>; + +/// \brief Loop through an asynchronous sequence +/// +/// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On completion of +/// each yielded future the resulting ControlFlow will be examined. A Break will terminate +/// the loop, while a Continue will re-invoke `iterate`. \return A future which will +/// complete when a Future returned by iterate completes with a Break +template <typename Iterate, + typename Control = typename detail::result_of_t<Iterate()>::ValueType, + typename BreakValueType = typename Control::value_type> +Future<BreakValueType> Loop(Iterate iterate) { + auto break_fut = Future<BreakValueType>::Make(); + + struct Callback { + bool CheckForTermination(const Result<Control>& control_res) { + if (!control_res.ok()) { + break_fut.MarkFinished(control_res.status()); + return true; + } + if (control_res->has_value()) { + break_fut.MarkFinished(*std::move(*control_res)); + return true; + } + return false; + } + + void operator()(const Result<Control>& maybe_control) && { + if (CheckForTermination(maybe_control)) return; + + auto control_fut = iterate(); + while (true) { + if (control_fut.is_finished()) { + // There's no need to AddCallback on a finished future; we can + // CheckForTermination now. This also avoids recursion and potential stack + // overflow. + if (CheckForTermination(control_fut.result())) return; + + control_fut = iterate(); + } else { + std::function<Callback()> callback_factory = [this]() { return *this; }; + if (control_fut.TryAddCallback(callback_factory)) { + break; + } + // Else we tried to add a callback but someone had stolen in and marked the + // future finished so we can just resume iteration + } Review comment: I agree that is simpler, I merged it in. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org