pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534295329



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -243,6 +253,12 @@ class ConcreteFutureImpl : public FutureImpl {
       }
     }
     cv_.notify_all();
+
+    // run callbacks
+    for (auto&& callback : callbacks_) {

Review comment:
       This isn't safe without the mutex held. But of course we don't want to 
call the callback while locked. The solution is probably to move the callbacks 
to another vector while locked.

##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -56,21 +67,13 @@ class ARROW_EXPORT TaskGroup : public 
std::enable_shared_from_this<TaskGroup> {
   /// The current aggregate error Status.  Non-blocking, useful for stopping 
early.
   virtual Status current_status() = 0;
 
-  /// Whether some tasks have already failed.  Non-blocking , useful for 
stopping early.
+  /// Whether some tasks have already failed.  Non-blocking, useful for 
stopping early.
   virtual bool ok() = 0;
 
   /// How many tasks can typically be executed in parallel.
   /// This is only a hint, useful for testing or debugging.
   virtual int parallelism() = 0;
 
-  /// Create a subgroup of this group.  This group can only finish
-  /// when all subgroups have finished (this means you must be
-  /// be careful to call Finish() on subgroups before calling it
-  /// on the main group).
-  // XXX if a subgroup errors out, should it propagate immediately to the 
parent
-  // and to children?
-  virtual std::shared_ptr<TaskGroup> MakeSubGroup() = 0;

Review comment:
       Thanks for removing this :-)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {

Review comment:
       Hmm... what sense does it make to take a `Result<int>` if the callback 
is not executed on failure?
   (same comment for the above test snippet)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void

Review comment:
       This test seems to already be in `FutureCompletionTest::Void`

##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.
+/// It can be constructed from any lambda which matches the provided call 
signature.
+/// Invoking it results in destruction of the lambda, freeing any 
state/references
+/// immediately. Invoking a default constructed FnOnce or one which has 
already been
+/// invoked will segfault.
+template <typename Signature>
+class FnOnce;
+
+template <typename R, typename... A>
+class FnOnce<R(A...)> {
+ public:
+  FnOnce() = default;
+
+  template <typename Fn,
+            typename = typename std::enable_if<std::is_convertible<
+                typename std::result_of<Fn && (A...)>::type, R>::value>::type>
+  FnOnce(Fn fn) : impl_(new FnImpl<Fn>(std::move(fn))) {  // NOLINT 
runtime/explicit
+  }
+
+  explicit operator bool() const { return impl_ != NULLPTR; }
+
+  R operator()(A... a) && {
+    auto bye = std::move(impl_);
+    return bye->invoke(static_cast<A&&>(a)...);

Review comment:
       Why not forward the arguments here?

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -229,6 +229,16 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
+  void AddCallback(Callback callback) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (IsFutureFinished(state_)) {
+      lock.unlock();
+      std::move(callback)();

Review comment:
       This callback could be run before the other ones, if 
`DoMarkFinishedOrFailed` is still running. Is that a concern?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon

Review comment:
       "produced" means "returned"?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the 
returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with 
the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete 
when the
+  ///   future returned by the callback completes (and will complete with the 
same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to 
successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run 
immediately
+  /// (before this method returns) and the returned future may already be 
marked complete
+  /// (it will definitely be marked complete if the callback returns a 
non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const 
Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), 
result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), 
result.status());

Review comment:
       You should move both callbacks in each branch, shouldn't you?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -19,63 +19,150 @@
 
 #include <atomic>
 #include <cmath>
+#include <functional>
 #include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/functional.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/type_fwd.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+namespace detail {
+
+struct Empty {
+  static Result<Empty> ToResult(Status s) {
+    if (ARROW_PREDICT_TRUE(s.ok())) {
+      return Empty{};
+    }
+    return s;
+  }
+};
+
+template <typename>
+struct is_future : std::false_type {};
+
+template <typename T>
+struct is_future<Future<T>> : std::true_type {};
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;
+
+constexpr struct ContinueFuture {
+  template <typename Return>
+  struct ForReturnImpl;
+
+  template <typename Return>
+  using ForReturn = typename ForReturnImpl<Return>::type;
+
+  template <typename Signature>
+  using ForSignature = ForReturn<result_of_t<Signature>>;
+
+  template <typename F, typename... A, typename R = result_of_t<F && (A && 
...)>,

Review comment:
       Can you use less terse argument names? I keep thinking `F` means 
"future" here.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes

Review comment:
       Explain which arguments the callback should expect.

##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -22,22 +22,33 @@
 #include <utility>
 
 #include "arrow/status.h"
+#include "arrow/util/future.h"

Review comment:
       This doesn't look required?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the 
returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with 
the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete 
when the
+  ///   future returned by the callback completes (and will complete with the 
same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to 
successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run 
immediately
+  /// (before this method returns) and the returned future may already be 
marked complete
+  /// (it will definitely be marked complete if the callback returns a 
non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const 
Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), 
result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), 
result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through 
unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename 
detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       I don't understand what this one does. Can you add a docstring or 
comment?

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -91,14 +81,17 @@ class ThreadedTaskGroup : public TaskGroup {
       nremaining_.fetch_add(1, std::memory_order_acquire);
 
       auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this());
-      Status st = executor_->Spawn([self, task]() {
-        if (self->ok_.load(std::memory_order_acquire)) {
-          // XXX what about exceptions?
-          Status st = task();
-          self->UpdateStatus(std::move(st));
-        }
-        self->OneTaskDone();
-      });
+      Status st = executor_->Spawn(std::bind(
+          [](const std::shared_ptr<ThreadedTaskGroup>& self,
+             const std::function<Status()>& task) {
+            if (self->ok_.load(std::memory_order_acquire)) {
+              // XXX what about exceptions?
+              Status st = task();
+              self->UpdateStatus(std::move(st));
+            }
+            self->OneTaskDone();
+          },
+          std::move(self), std::move(task)));

Review comment:
       C++14 would make this easier...

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -116,25 +109,15 @@ class ThreadedTaskGroup : public TaskGroup {
       cv_.wait(lock, [&]() { return nremaining_.load() == 0; });
       // Current tasks may start other tasks, so only set this when done
       finished_ = true;
-      if (parent_) {
-        parent_->OneTaskDone();
-      }
+      completion_future_.MarkFinished(status_);
     }
     return status_;
   }
 
   int parallelism() override { return executor_->GetCapacity(); }
 
-  std::shared_ptr<TaskGroup> MakeSubGroup() override {
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto child = new ThreadedTaskGroup(executor_);
-    child->parent_ = this;
-    nremaining_.fetch_add(1, std::memory_order_acquire);
-    return std::shared_ptr<TaskGroup>(child);
-  }
-
  protected:
-  void UpdateStatus(Status&& st) {
+  void UpdateStatus(const Status& st) {

Review comment:
       Hmm, `st` is moved below.

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -159,13 +142,13 @@ class ThreadedTaskGroup : public TaskGroup {
   Executor* executor_;
   std::atomic<int32_t> nremaining_;
   std::atomic<bool> ok_;
+  Future<> completion_future_ = Future<>::Make();

Review comment:
       This isn't used yet? Or does its presence have side effects?

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -103,8 +103,40 @@ static void ThreadPoolSpawn(benchmark::State& state) {
   state.SetItemsProcessed(state.iterations() * nspawns);
 }
 
+// Benchmark ThreadPool::Submit
+static void ThreadPoolSubmit(benchmark::State& state) {  // NOLINT non-const 
reference
+  const auto nthreads = static_cast<int>(state.range(0));
+  const auto workload_size = static_cast<int32_t>(state.range(1));
+
+  Workload workload(workload_size);
+
+  const int32_t nspawns = 10000000 / workload_size + 1;
+
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto pool = *ThreadPool::Make(nthreads);
+    std::atomic<int32_t> n_finished{0};
+    state.ResumeTiming();
+
+    for (int32_t i = 0; i < nspawns; ++i) {
+      // Pass the task by reference to avoid copying it around
+      (void)DeferNotOk(pool->Submit(std::ref(workload))).Then([&](...) {

Review comment:
       This is benchmarking much more than `Submit`, because it is also 
stressing the callback and continuation management. I would rather have 
separate `Future` micro-benchmarks (for construction, simple use cases, 
callbacks, continuations...).

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });

Review comment:
       Out of curiosity, what happens if one of these callbacks captures `fut`?

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -136,21 +168,24 @@ static void ThreadedTaskGroup(benchmark::State& state) {
 
   for (auto _ : state) {
     auto task_group = TaskGroup::MakeThreaded(pool.get());
-    for (int32_t i = 0; i < nspawns; ++i) {
-      // Pass the task by reference to avoid copying it around
-      task_group->Append(std::ref(task));
-    }
+    task_group->Append([&task, nspawns, task_group] {

Review comment:
       Why the nested append?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Hmm... this doesn't match the comment above anymore.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -768,4 +1361,27 @@ TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { 
this->TestErrorsAsCompleted(
 
 TYPED_TEST(FutureIteratorTest, StressAsCompleted) { 
this->TestStressAsCompleted(); }
 
+namespace internal {
+TEST(FnOnceTest, MoveOnlyDataType) {
+  // ensuring this is valid guarantees we are making no unnecessary copies

Review comment:
       Nice. Do we want to use `FnOnce` in other places at some point? (perhaps 
open a JIRA?)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure

Review comment:
       Did you copy/paste this? This seems to be the same test snippet as a 
couple dozens lines above.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status

Review comment:
       Ditto? (not sure)

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -103,8 +103,40 @@ static void ThreadPoolSpawn(benchmark::State& state) {
   state.SetItemsProcessed(state.iterations() * nspawns);
 }
 
+// Benchmark ThreadPool::Submit
+static void ThreadPoolSubmit(benchmark::State& state) {  // NOLINT non-const 
reference
+  const auto nthreads = static_cast<int>(state.range(0));
+  const auto workload_size = static_cast<int32_t>(state.range(1));
+
+  Workload workload(workload_size);
+
+  const int32_t nspawns = 10000000 / workload_size + 1;
+
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto pool = *ThreadPool::Make(nthreads);
+    std::atomic<int32_t> n_finished{0};
+    state.ResumeTiming();
+
+    for (int32_t i = 0; i < nspawns; ++i) {
+      // Pass the task by reference to avoid copying it around
+      (void)DeferNotOk(pool->Submit(std::ref(workload))).Then([&](...) {

Review comment:
       (the `Future` micro-benchmarks can be deferred to another PR, though)

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, 
detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future 
completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at 
least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future 
completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished 
(whatever
+  /// callback is registered with this function will run before MarkFinished 
returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an 
Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul 
completion.
+  /// - OnFailure, called against the error (const Status&) on failed 
completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type 
of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as 
soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the 
returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with 
the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete 
when the
+  ///   future returned by the callback completes (and will complete with the 
same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to 
successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run 
immediately
+  /// (before this method returns) and the returned future may already be 
marked complete
+  /// (it will definitely be marked complete if the callback returns a 
non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const 
Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), 
result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), 
result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through 
unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename 
detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       Hmm, looking at the tests, I think I understand. The `StatusOnly` idiom 
doesn't look terrific. How about instead declaring this as `ContinuedFuture 
ThenBoth(OnComplete&& on_complete)`, where `OnComplete` would take either a 
`Result<T>` or a `Status`, depending on the future type?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 =
+        fut.Then(StatusOnly, [&status_seen](const Status& s) { status_seen = 
s; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, FutureStatus) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result, innerFut](const int& i) {
+      passed_in_result = i;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [&innerFut](const Status&) { return 
innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, Result) {
+  {

Review comment:
       Do we want to test cases where a failed `Result` is returned? Though it 
should work, since `MarkFinished` accepts it.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure

Review comment:
       Ditto here? (though I'm not sure)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Perhaps instead add separate `AssertSuccessfulSoon` and 
`AssertFailedSoon`?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = 
result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = 
result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& 
result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& 
s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return 
innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status

Review comment:
       DItto?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to