Added overload of process::await that takes and returns single future. Review: https://reviews.apache.org/r/61152
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e8394b4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e8394b4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e8394b4 Branch: refs/heads/1.3.x Commit: 4e8394b48e53d7c962cc9d53e0f6a90be9a7e74e Parents: 0e997be Author: Benjamin Hindman <[email protected]> Authored: Wed Dec 28 06:42:06 2016 -0800 Committer: Gilbert Song <[email protected]> Committed: Mon Mar 5 18:14:59 2018 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/collect.hpp | 56 ++++++++++++++++ 3rdparty/libprocess/src/tests/collect_tests.cpp | 67 ++++++++++++++++++++ 2 files changed, 123 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8394b4/3rdparty/libprocess/include/process/collect.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp index fccf55a..43f55ca 100644 --- a/3rdparty/libprocess/include/process/collect.hpp +++ b/3rdparty/libprocess/include/process/collect.hpp @@ -57,6 +57,62 @@ template <typename... Ts> Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures); +// Waits on the future specified and returns after the future has been +// completed or the await has been discarded. This is useful when +// wanting to "break out" of a future chain if a discard occurs but +// the underlying future has not been discarded. For example: +// +// Future<string> foo() +// { +// return bar() +// .then([](int i) { +// return stringify(i); +// }); +// } +// +// Future<stringify> future = foo(); +// future.discard(); +// +// In the above code we'll propagate the discard to `bar()` but might +// wait forever if `bar()` can't discard their computation. In some +// circumstances you might want to break out early and you can do that +// by using `await`, because if we discard an `await` that function +// will return even though all of the future's it is waiting on have +// not been discarded (in other words, the `await` can be properly +// discarded even if the underlying futures have not been discarded). +// +// Future<string> foo() +// { +// return await(bar()) +// .recover([](const Future<Future<string>>& future) { +// if (future.isDiscarded()) { +// cleanup(); +// } +// return Failure("Discarded waiting"); +// }) +// .then([](const Future<int>& future) { +// return future +// .then([](int i) { +// return stringify(i); +// }); +// }); +// } +// +// Future<string> future = foo(); +// future.discard(); +// +// Using `await` will enable you to continue execution even if `bar()` +// does not (or can not) discard their execution. +template <typename T> +Future<Future<T>> await(const Future<T>& future) +{ + return await(std::list<Future<T>>{future}) + .then([=]() { + return Future<Future<T>>(future); + }); +} + + namespace internal { template <typename T> http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8394b4/3rdparty/libprocess/src/tests/collect_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/collect_tests.cpp b/3rdparty/libprocess/src/tests/collect_tests.cpp index 155e0bb..0dd3771 100644 --- a/3rdparty/libprocess/src/tests/collect_tests.cpp +++ b/3rdparty/libprocess/src/tests/collect_tests.cpp @@ -10,15 +10,19 @@ // See the License for the specific language governing permissions and // limitations under the License +#include <string> + #include <process/collect.hpp> #include <process/gtest.hpp> #include <stout/gtest.hpp> +#include <stout/stringify.hpp> using process::Future; using process::Promise; using std::list; +using std::string; TEST(CollectTest, Ready) { @@ -242,3 +246,66 @@ TEST(AwaitTest, DiscardPropagation) AWAIT_DISCARDED(future1); AWAIT_DISCARDED(future2); } + + +TEST(AwaitTest, AwaitSingleDiscard) +{ + Promise<int> promise; + + auto bar = [&]() { + return promise.future(); + }; + + auto foo = [&]() { + return await(bar()) + .then([](const Future<int>& f) { + return f + .then([](int i) { + return stringify(i); + }); + }); + }; + + Future<string> future = foo(); + + future.discard(); + + AWAIT_DISCARDED(future); + + EXPECT_TRUE(promise.future().hasDiscard()); +} + + +TEST(AwaitTest, AwaitSingleAbandon) +{ + Owned<Promise<int>> promise(new Promise<int>()); + + auto bar = [&]() { + return promise->future(); + }; + + auto foo = [&]() { + return await(bar()) + .then([](const Future<int>& f) { + return f + .then([](int i) { + return stringify(i); + }); + }); + }; + + // There is a race from the time that we reset the promise to when + // the await process is terminated so we need to use Future::recover + // to properly handle this case. + Future<string> future = foo() + .recover([](const Future<string>& f) -> Future<string> { + if (f.isAbandoned()) { + return "hello"; + } + return f; + }); + + promise.reset(); + + AWAIT_EQ("hello", future); +}
