westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545213923



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the 
returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with 
the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete 
when the
+  ///   future returned by the callback completes (and will complete with the 
same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to 
successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run 
immediately
+  /// (before this method returns) and the returned future may already be 
marked complete
+  /// (it will definitely be marked complete if the callback returns a 
non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const 
Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), 
result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), 
result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through 
unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename 
detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       I agree with Antoine although I don't know if ThenBoth is much clearer.  
The main concern I have is that marking something "StatusOnly" can cause the 
error state of the chain to be cleared.  For example, in RX, there are three 
types of callbacks.
   
   on_success - continues the chain with success
   on_failure - intercepts a failure in the chain and (attempts to) convert it 
to a success
   on_completed - intercepts success or failure but does not convert to success
   
   on_failure is equivalent to try/catch while on_completed is equivalent to 
try/finally.
   
   My concern here is that StatusOnly makes me think of on_completed and so it 
can't clear the error state of the future.  However, looking at the tests, it 
appears that it does actually clear the error state.
   
   Could we change the name to ThenFinally or WhenCompleted and then change the 
behavior so that a "StatusOnly" callback can't clear the error state?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to