westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545363890



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the 
returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with 
the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete 
when the
+  ///   future returned by the callback completes (and will complete with the 
same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to 
successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run 
immediately
+  /// (before this method returns) and the returned future may already be 
marked complete
+  /// (it will definitely be marked complete if the callback returns a 
non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const 
Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), 
result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), 
result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through 
unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename 
detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       After talking with Ben we are just going to drop StatusOnly until we 
have more compelling use cases.  In my mind it would be used for cleanup steps 
like you described (I'm not sure what you mean by out of order).
   
   For example...
   
   ```
   char * buffer = new char[1024*1024];
   auto fut = LoadBuffer(buffer).Then([buffer] { return ProcessBuffer(buffer); 
}).ThenFinally([buffer] {delete buffer;});
   ```
   
   So `ThenFinally` would be called after `LoadBuffer` and `ProcessBuffer` and 
it would be called regardless of whether they succeeded or failed and it would 
not affect the final result.  It would be equivalent to...
   
   ```
   .Then((const Result<ProcessedBuffer>& result) [buffer] { delete buffer; 
return result; }, (const Status& error_status) [buffer] { delete buffer; return 
error_status; })
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to