Added overload of process::await that takes and returns single future.

Review: https://reviews.apache.org/r/61152


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e8394b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e8394b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e8394b4

Branch: refs/heads/1.3.x
Commit: 4e8394b48e53d7c962cc9d53e0f6a90be9a7e74e
Parents: 0e997be
Author: Benjamin Hindman <benjamin.hind...@gmail.com>
Authored: Wed Dec 28 06:42:06 2016 -0800
Committer: Gilbert Song <songzihao1...@gmail.com>
Committed: Mon Mar 5 18:14:59 2018 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/collect.hpp | 56 ++++++++++++++++
 3rdparty/libprocess/src/tests/collect_tests.cpp | 67 ++++++++++++++++++++
 2 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8394b4/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp 
b/3rdparty/libprocess/include/process/collect.hpp
index fccf55a..43f55ca 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -57,6 +57,62 @@ template <typename... Ts>
 Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures);
 
 
+// Waits on the future specified and returns after the future has been
+// completed or the await has been discarded. This is useful when
+// wanting to "break out" of a future chain if a discard occurs but
+// the underlying future has not been discarded. For example:
+//
+//   Future<string> foo()
+//   {
+//     return bar()
+//       .then([](int i) {
+//         return stringify(i);
+//       });
+//   }
+//
+//   Future<stringify> future = foo();
+//   future.discard();
+//
+// In the above code we'll propagate the discard to `bar()` but might
+// wait forever if `bar()` can't discard their computation. In some
+// circumstances you might want to break out early and you can do that
+// by using `await`, because if we discard an `await` that function
+// will return even though all of the future's it is waiting on have
+// not been discarded (in other words, the `await` can be properly
+// discarded even if the underlying futures have not been discarded).
+//
+//   Future<string> foo()
+//   {
+//     return await(bar())
+//       .recover([](const Future<Future<string>>& future) {
+//         if (future.isDiscarded()) {
+//           cleanup();
+//         }
+//         return Failure("Discarded waiting");
+//       })
+//       .then([](const Future<int>& future) {
+//         return future
+//           .then([](int i) {
+//             return stringify(i);
+//           });
+//       });
+//   }
+//
+//   Future<string> future = foo();
+//   future.discard();
+//
+// Using `await` will enable you to continue execution even if `bar()`
+// does not (or can not) discard their execution.
+template <typename T>
+Future<Future<T>> await(const Future<T>& future)
+{
+  return await(std::list<Future<T>>{future})
+    .then([=]() {
+      return Future<Future<T>>(future);
+    });
+}
+
+
 namespace internal {
 
 template <typename T>

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8394b4/3rdparty/libprocess/src/tests/collect_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp 
b/3rdparty/libprocess/src/tests/collect_tests.cpp
index 155e0bb..0dd3771 100644
--- a/3rdparty/libprocess/src/tests/collect_tests.cpp
+++ b/3rdparty/libprocess/src/tests/collect_tests.cpp
@@ -10,15 +10,19 @@
 // See the License for the specific language governing permissions and
 // limitations under the License
 
+#include <string>
+
 #include <process/collect.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
+#include <stout/stringify.hpp>
 
 using process::Future;
 using process::Promise;
 
 using std::list;
+using std::string;
 
 TEST(CollectTest, Ready)
 {
@@ -242,3 +246,66 @@ TEST(AwaitTest, DiscardPropagation)
   AWAIT_DISCARDED(future1);
   AWAIT_DISCARDED(future2);
 }
+
+
+TEST(AwaitTest, AwaitSingleDiscard)
+{
+  Promise<int> promise;
+
+  auto bar = [&]() {
+    return promise.future();
+  };
+
+  auto foo = [&]() {
+    return await(bar())
+      .then([](const Future<int>& f) {
+        return f
+          .then([](int i) {
+            return stringify(i);
+          });
+      });
+  };
+
+  Future<string> future = foo();
+
+  future.discard();
+
+  AWAIT_DISCARDED(future);
+
+  EXPECT_TRUE(promise.future().hasDiscard());
+}
+
+
+TEST(AwaitTest, AwaitSingleAbandon)
+{
+  Owned<Promise<int>> promise(new Promise<int>());
+
+  auto bar = [&]() {
+    return promise->future();
+  };
+
+  auto foo = [&]() {
+    return await(bar())
+      .then([](const Future<int>& f) {
+        return f
+          .then([](int i) {
+            return stringify(i);
+          });
+      });
+  };
+
+  // There is a race from the time that we reset the promise to when
+  // the await process is terminated so we need to use Future::recover
+  // to properly handle this case.
+  Future<string> future = foo()
+    .recover([](const Future<string>& f) -> Future<string> {
+      if (f.isAbandoned()) {
+        return "hello";
+      }
+      return f;
+    });
+
+  promise.reset();
+
+  AWAIT_EQ("hello", future);
+}

Reply via email to