westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564825292
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const
std::vector<Future<T>*>& futures,
return waiter->MoveFinishedFutures();
}
+template <typename T = detail::Empty>
+struct ControlFlow {
+ using BreakValueType = T;
+
+ bool IsBreak() const { return break_value_.has_value(); }
+
+ static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+ return std::move(*cf.break_value_);
+ }
+
+ mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+ template <typename T>
+ operator ControlFlow<T>() && { // NOLINT explicit
+ return {};
+ }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+ return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+ typename Control = typename
detail::result_of_t<Iterate()>::ValueType,
+ typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+ auto break_fut = Future<BreakValueType>::Make();
+
+ struct Callback {
+ bool CheckForTermination(const Result<Control>& maybe_control) {
+ if (!maybe_control.ok() || maybe_control->IsBreak()) {
+ Result<BreakValueType> maybe_break =
maybe_control.Map(Control::MoveBreakValue);
+ break_fut.MarkFinished(std::move(maybe_break));
+ return true;
+ }
+ return false;
+ }
+
+ void operator()(const Result<Control>& maybe_control) && {
+ if (CheckForTermination(maybe_control)) return;
+
+ auto control_fut = iterate();
+ while (control_fut.is_finished()) {
+ // There's no need to AddCallback on a finished future; we can
CheckForTermination
+ // now. This also avoids recursion and potential stack overflow.
Review comment:
There is a mutex inside of `AddCallback`. We could create a
`TryAddCallback` which returns false without running the callback if the future
is finished. This method could be guaranteed to never run the callback
synchronously. I will add this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]