westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565958445
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const
std::vector<Future<T>*>& futures,
return waiter->MoveFinishedFutures();
}
+template <typename T = detail::Empty>
+struct ControlFlow {
+ using BreakValueType = T;
+
+ bool IsBreak() const { return break_value_.has_value(); }
+
+ static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+ return std::move(*cf.break_value_);
+ }
+
+ mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+ template <typename T>
+ operator ControlFlow<T>() && { // NOLINT explicit
+ return {};
+ }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+ return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+ typename Control = typename
detail::result_of_t<Iterate()>::ValueType,
+ typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+ auto break_fut = Future<BreakValueType>::Make();
+
+ struct Callback {
+ bool CheckForTermination(const Result<Control>& maybe_control) {
+ if (!maybe_control.ok() || maybe_control->IsBreak()) {
+ Result<BreakValueType> maybe_break =
maybe_control.Map(Control::MoveBreakValue);
+ break_fut.MarkFinished(std::move(maybe_break));
+ return true;
+ }
+ return false;
+ }
+
+ void operator()(const Result<Control>& maybe_control) && {
+ if (CheckForTermination(maybe_control)) return;
+
+ auto control_fut = iterate();
+ while (control_fut.is_finished()) {
+ // There's no need to AddCallback on a finished future; we can
CheckForTermination
+ // now. This also avoids recursion and potential stack overflow.
Review comment:
I struggled to create a test case that could reproduce this. It seems
like it should be reproducible but even adding a slow callback (so MarkFinished
took a long time) it was difficult to get a task to be added and then finish
just when I wanted it to. I have added some logic however so that this race
condition should not be possible any longer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]