Repository: mesos Updated Branches: refs/heads/master 3544df756 -> e833793cc
Updated Master::exited to handle http frameworks. Review: https://reviews.apache.org/r/36720 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e833793c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e833793c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e833793c Branch: refs/heads/master Commit: e833793ccee339e130e541934e55a67eb63bd2ad Parents: 3544df7 Author: Anand Mazumdar <[email protected]> Authored: Tue Aug 4 14:46:22 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue Aug 4 15:05:07 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 91 ++++++++++++++++++++++++++++++---------------- src/master/master.hpp | 11 +++++- 2 files changed, 70 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e833793c/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 87e11d5..5aa0a54 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -107,6 +107,8 @@ using process::Time; using process::Timer; using process::UPID; +using process::http::Pipe; + using process::metrics::Counter; namespace mesos { @@ -959,42 +961,31 @@ void Master::finalize() } -void Master::exited(const UPID& pid) +void Master::exited(const FrameworkID& frameworkId, Pipe::Writer writer) { foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == pid) { - LOG(INFO) << "Framework " << *framework << " disconnected"; - - // Disconnect the framework. - disconnect(framework); - - // Set 'failoverTimeout' to the default and update only if the - // input is valid. - Try<Duration> failoverTimeout_ = - Duration::create(FrameworkInfo().failover_timeout()); - CHECK_SOME(failoverTimeout_); - Duration failoverTimeout = failoverTimeout_.get(); - - failoverTimeout_ = - Duration::create(framework->info.failover_timeout()); - if (failoverTimeout_.isSome()) { - failoverTimeout = failoverTimeout_.get(); - } else { - LOG(WARNING) << "Using the default value for 'failover_timeout' because" - << "the input value is invalid: " - << failoverTimeout_.error(); - } + if (framework->http.isSome() && framework->http.get().writer == writer) { + CHECK_EQ(frameworkId, framework->id()); + _exited(framework); + return; + } - LOG(INFO) << "Giving framework " << *framework << " " - << failoverTimeout << " to failover"; + // If the framework has reconnected, the writer will not match + // above, and we will have a framework with a matching id. + if (frameworkId == framework->id()) { + LOG(INFO) << "Ignoring disconnection for framework " + << *framework << " as it has already reconnected"; + return; + } + } +} - // Delay dispatching a message to ourselves for the timeout. - delay(failoverTimeout, - self(), - &Master::frameworkFailoverTimeout, - framework->id(), - framework->reregisteredTime); +void Master::exited(const UPID& pid) +{ + foreachvalue (Framework* framework, frameworks.registered) { + if (framework->pid == pid) { + _exited(framework); return; } } @@ -1054,6 +1045,44 @@ void Master::exited(const UPID& pid) } +void Master::_exited(Framework* framework) +{ + LOG(INFO) << "Framework " << *framework << " disconnected"; + + // Disconnect the framework. + disconnect(framework); + + // Set 'failoverTimeout' to the default and update only if the + // input is valid. + Try<Duration> failoverTimeout_ = + Duration::create(FrameworkInfo().failover_timeout()); + + CHECK_SOME(failoverTimeout_); + Duration failoverTimeout = failoverTimeout_.get(); + + failoverTimeout_ = + Duration::create(framework->info.failover_timeout()); + + if (failoverTimeout_.isSome()) { + failoverTimeout = failoverTimeout_.get(); + } else { + LOG(WARNING) << "Using the default value for 'failover_timeout' because" + << "the input value is invalid: " + << failoverTimeout_.error(); + } + + LOG(INFO) << "Giving framework " << *framework << " " + << failoverTimeout << " to failover"; + + // Delay dispatching a message to ourselves for the timeout. + delay(failoverTimeout, + self(), + &Master::frameworkFailoverTimeout, + framework->id(), + framework->reregisteredTime); +} + + void Master::visit(const MessageEvent& event) { // There are three cases about the message's UPID with respect to http://git-wip-us.apache.org/repos/asf/mesos/blob/e833793c/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index cd0a5c8..e441749 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -507,10 +507,16 @@ public: protected: virtual void initialize(); virtual void finalize(); - virtual void exited(const process::UPID& pid); + virtual void visit(const process::MessageEvent& event); virtual void visit(const process::ExitedEvent& event); + virtual void exited(const process::UPID& pid); + void exited( + const FrameworkID& frameworkId, + process::http::Pipe::Writer writer); + void _exited(Framework* framework); + // Invoked when the message is ready to be executed after // being throttled. // 'principal' being None indicates it is throttled by @@ -1270,6 +1276,9 @@ struct Framework auto encoder = recordio::Encoder<scheduler::Event>(serialize); http = Http {writer, encoder}; + + http.get().writer.readerClosed(). + onAny(defer(master->self(), &Master::exited, id(), writer)); } ~Framework()
