Handle discards on the master detection futures. Review: https://reviews.apache.org/r/23867
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e20ea630 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e20ea630 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e20ea630 Branch: refs/heads/master Commit: e20ea63025c158f8377df24650c8ee2c0cabf68c Parents: 2b5fb07 Author: Benjamin Mahler <[email protected]> Authored: Wed Jul 23 15:45:05 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Aug 4 13:55:27 2014 -0700 ---------------------------------------------------------------------- src/master/detector.cpp | 39 ++++++++++++++++++++++ src/tests/master_contender_detector_tests.cpp | 33 ++++++++++++++++-- 2 files changed, 70 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e20ea630/src/master/detector.cpp ---------------------------------------------------------------------- diff --git a/src/master/detector.cpp b/src/master/detector.cpp index 93a6cb2..6436b8e 100644 --- a/src/master/detector.cpp +++ b/src/master/detector.cpp @@ -94,6 +94,21 @@ void discard(std::set<Promise<T>* >* promises) promises->clear(); } + +// Helper for discarding an individual promise in the set. +template <typename T> +void discard(std::set<Promise<T>* >* promises, const Future<T>& future) +{ + foreach (Promise<T>* promise, *promises) { + if (promise->future() == future) { + promise->discard(); + promises->erase(promise); + delete promise; + return; + } + } +} + } // namespace promises { @@ -127,11 +142,21 @@ public: } Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); + + promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); + promises.insert(promise); return promise->future(); } private: + void discard(const Future<Option<MasterInfo> >& future) + { + // Discard the promise holding this future. + promises::discard(&promises, future); + } + Option<MasterInfo> leader; // The appointed master. set<Promise<Option<MasterInfo> >*> promises; }; @@ -149,6 +174,8 @@ public: Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous); private: + void discard(const Future<Option<MasterInfo> >& future); + // Invoked when the group leadership has changed. void detected(const Future<Option<Group::Membership> >& leader); @@ -293,6 +320,14 @@ void ZooKeeperMasterDetectorProcess::initialize() } +void ZooKeeperMasterDetectorProcess::discard( + const Future<Option<MasterInfo> >& future) +{ + // Discard the promise holding this future. + promises::discard(&promises, future); +} + + Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect( const Option<MasterInfo>& previous) { @@ -307,6 +342,10 @@ Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect( } Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); + + promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); + promises.insert(promise); return promise->future(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/e20ea630/src/tests/master_contender_detector_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp index 231648d..fdddfa1 100644 --- a/src/tests/master_contender_detector_tests.cpp +++ b/src/tests/master_contender_detector_tests.cpp @@ -167,6 +167,16 @@ TEST(BasicMasterContenderDetectorTest, Detector) // No one has appointed the leader so we are pending. EXPECT_TRUE(detected.isPending()); + // Ensure that the future can be discarded. + detected.discard(); + + AWAIT_DISCARDED(detected); + + detected = detector.detect(); + + // Still no leader appointed yet. + EXPECT_TRUE(detected.isPending()); + detector.appoint(master); AWAIT_READY(detected); @@ -202,7 +212,28 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender) ZooKeeperMasterDetector detector(url.get()); Future<Option<MasterInfo> > leader = detector.detect(); + + AWAIT_READY(leader); EXPECT_SOME_EQ(master, leader.get()); + + leader = detector.detect(leader.get()); + + // No change to leadership. + ASSERT_TRUE(leader.isPending()); + + // Ensure we can discard the future. + leader.discard(); + + AWAIT_DISCARDED(leader); + + // After the discard, we can re-detect correctly. + leader = detector.detect(None()); + + AWAIT_READY(leader); + EXPECT_SOME_EQ(master, leader.get()); + + // Now test that a session expiration causes candidacy to be lost + // and the future to become ready. Future<Nothing> lostCandidacy = contended.get(); leader = detector.detect(leader.get()); @@ -210,8 +241,6 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender) AWAIT_READY(sessionId); server->expireSession(sessionId.get().get()); - // Session expiration causes candidacy to be lost and the - // Future<Nothing> to be fulfilled. AWAIT_READY(lostCandidacy); AWAIT_READY(leader); EXPECT_NONE(leader.get());
