Repository: mesos Updated Branches: refs/heads/master adecbfa6a -> c687237e8
Added Heartbeater to master to send periodic heartbeats to HTTP schedulers. Review: https://reviews.apache.org/r/37277 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c687237e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c687237e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c687237e Branch: refs/heads/master Commit: c687237e8e4e7e40e6c7a9d8ba6f2e7d90dab155 Parents: adecbfa Author: Vinod Kone <[email protected]> Authored: Sat Aug 8 19:34:59 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Aug 13 11:08:21 2015 -0700 ---------------------------------------------------------------------- src/internal/evolve.cpp | 6 +++ src/internal/evolve.hpp | 3 ++ src/master/constants.cpp | 1 + src/master/constants.hpp | 5 ++ src/master/master.cpp | 22 ++++++-- src/master/master.hpp | 106 +++++++++++++++++++++++++++++++++----- src/tests/http_api_tests.cpp | 46 ++++++++++++++++- 7 files changed, 170 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/internal/evolve.cpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp index 4678d67..11ce9e7 100644 --- a/src/internal/evolve.cpp +++ b/src/internal/evolve.cpp @@ -116,6 +116,12 @@ v1::scheduler::Call evolve(const scheduler::Call& call) } +v1::scheduler::Event evolve(const scheduler::Event& event) +{ + return evolve<v1::scheduler::Event>(event); +} + + v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message) { v1::scheduler::Event event; http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/internal/evolve.hpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp index 2e03559..13e9f52 100644 --- a/src/internal/evolve.hpp +++ b/src/internal/evolve.hpp @@ -64,6 +64,9 @@ google::protobuf::RepeatedPtrField<T1> evolve( } +v1::scheduler::Event evolve(const scheduler::Event& event); + + // Helper functions that evolve old style internal messages to a // v1::scheduler::Event. v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message); http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/constants.cpp ---------------------------------------------------------------------- diff --git a/src/master/constants.cpp b/src/master/constants.cpp index fbcae60..918dd70 100644 --- a/src/master/constants.cpp +++ b/src/master/constants.cpp @@ -31,6 +31,7 @@ namespace master { const int MAX_OFFERS_PER_FRAMEWORK = 50; const double MIN_CPUS = 0.01; const Bytes MIN_MEM = Megabytes(32); +const Duration DEFAULT_HEARTBEAT_INTERVAL = Seconds(15); const Duration DEFAULT_SLAVE_PING_TIMEOUT = Seconds(15); const size_t DEFAULT_MAX_SLAVE_PING_TIMEOUTS = 5; const Duration MIN_SLAVE_REREGISTER_TIMEOUT = Minutes(10); http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/constants.hpp ---------------------------------------------------------------------- diff --git a/src/master/constants.hpp b/src/master/constants.hpp index 7cec18b..7afa1ec 100644 --- a/src/master/constants.hpp +++ b/src/master/constants.hpp @@ -54,6 +54,11 @@ extern const double MIN_CPUS; // Minimum amount of memory per offer. extern const Bytes MIN_MEM; + +// Default interval the master uses to send heartbeats to an HTTP +// scheduler. +extern const Duration DEFAULT_HEARTBEAT_INTERVAL; + // Amount of time within which a slave PING should be received. // NOTE: The slave uses these PING constants to determine when // the master has stopped sending pings. If these are made http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 398203d..83d2f44 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1778,6 +1778,10 @@ void Master::subscribe( // TODO(anand): Authenticate the framework. const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + + LOG(INFO) << "Received subscription request for" + << " HTTP framework '" << frameworkInfo.name() << "'"; + Option<Error> validationError = None(); // TODO(vinod): Deprecate this in favor of ACLs. @@ -1808,9 +1812,6 @@ void Master::subscribe( } } - LOG(INFO) << "Received subscription request for" - << " HTTP framework '" << frameworkInfo.name() << "'"; - if (validationError.isSome()) { LOG(INFO) << "Refusing subscription of framework" << " '" << frameworkInfo.name() << "': " @@ -1889,6 +1890,10 @@ void Master::_subscribe( message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); framework->send(message); + + // Start the heartbeat after sending SUBSCRIBED event. + framework->heartbeat(); + return; } @@ -1934,7 +1939,7 @@ void Master::_subscribe( return; } else { LOG(INFO) << "Allowing framework " << *framework - << " to subcribe with an already used id"; + << " to subscribe with an already used id"; // Convert the framework to an http framework if it was // pid based in the past. @@ -1958,6 +1963,9 @@ void Master::_subscribe( message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); framework->send(message); + + // Start the heartbeat after sending SUBSCRIBED event. + framework->heartbeat(); } } else { // We don't have a framework with this ID, so we must be a newly @@ -1986,6 +1994,9 @@ void Master::_subscribe( message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); framework->send(message); + + // Start the heartbeat after sending SUBSCRIBED event. + framework->heartbeat(); } CHECK(frameworks.registered.contains(frameworkInfo.id())) @@ -5005,6 +5016,9 @@ void Master::failoverFramework(Framework* framework, const HttpConnection& http) .onAny(defer(self(), &Self::exited, framework->id(), http)); _failoverFramework(framework); + + // Start the heartbeat after sending SUBSCRIBED event. + framework->heartbeat(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 4e29470..b353b8e 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -586,7 +586,7 @@ protected: // the event of a scheduler failover. void failoverFramework(Framework* framework, const process::UPID& newPid); - // Replace the scheduler for a framework with a new http connection, + // Replace the scheduler for a framework with a new HTTP connection, // in the event of a scheduler failover. void failoverFramework(Framework* framework, const HttpConnection& http); @@ -1278,6 +1278,47 @@ struct HttpConnection }; +// This process periodically sends heartbeats to a scheduler on the +// given HTTP connection. +class Heartbeater : public process::Process<Heartbeater> +{ +public: + Heartbeater(const FrameworkID& _frameworkId, + const HttpConnection& _http, + const Duration& _interval) + : process::ProcessBase(process::ID::generate("heartbeater")), + frameworkId(_frameworkId), + http(_http), + interval(_interval) {} + +protected: + virtual void initialize() override + { + heartbeat(); + } + +private: + void heartbeat() + { + // Only send a heartbeat if the connection is not closed. + if (http.closed().isPending()) { + VLOG(1) << "Sending heartbeat to " << frameworkId; + + scheduler::Event event; + event.set_type(scheduler::Event::HEARTBEAT); + + http.send(event); + } + + process::delay(interval, self(), &Self::heartbeat); + } + + const FrameworkID frameworkId; + HttpConnection http; + const Duration interval; +}; + + // Information about a connected or completed framework. // TODO(bmahler): Keeping the task and executor information in sync // across the Slave and Framework structs is error prone! @@ -1311,10 +1352,8 @@ struct Framework ~Framework() { - if (http.isSome() && connected) { - if (!http.get().close()) { - LOG(WARNING) << "Failed to close HTTP pipe for " << *this; - } + if (http.isSome()) { + cleanupConnection(); } } @@ -1531,11 +1570,10 @@ struct Framework void updateConnection(const process::UPID& newPid) { - // Remove the http connnection if this is a downgrade from - // http to pid, note the connection may already be closed. + // Cleanup the HTTP connnection if this is a downgrade from HTTP + // to PID. Note that the connection may already be closed. if (http.isSome()) { - http.get().close(); - http = None(); + cleanupConnection(); } // TODO(benh): unlink(oldPid); @@ -1544,20 +1582,57 @@ struct Framework void updateConnection(const HttpConnection& newHttp) { - // Wipe the pid if this is an upgrade from pid to http. if (pid.isSome()) { + // Wipe the PID if this is an upgrade from PID to HTTP. // TODO(benh): unlink(oldPid); pid = None(); + } else { + // Cleanup the old HTTP connection. + // Note that master creates a new HTTP connection for every + // subscribe request, so 'newHttp' should always be different + // from 'http'. + cleanupConnection(); } - // Close the existing connection if it has changed. - if (http.isSome() && http.get().writer != newHttp.writer) { - http.get().close(); - } + CHECK_NONE(http); http = newHttp; } + // Closes the connection and stops the heartbeat. + // TODO(vinod): Currently 'connected' variable is set separately + // from this method. We need to make sure these are in sync. + void cleanupConnection() + { + CHECK_SOME(http); + + if (connected && !http.get().close()) { + LOG(WARNING) << "Failed to close HTTP pipe for " << *this; + } + + http = None(); + + CHECK_SOME(heartbeater); + + terminate(heartbeater.get().get()); + wait(heartbeater.get().get()); + + heartbeater = None(); + } + + void heartbeat() + { + CHECK_NONE(heartbeater); + CHECK_SOME(http); + + // TODO(vinod): Make heartbeat interval configurable and include + // this information in the SUBSCRIBED response. + heartbeater = + new Heartbeater(info.id(), http.get(), DEFAULT_HEARTBEAT_INTERVAL); + + process::spawn(heartbeater.get().get()); + } + Master* const master; FrameworkInfo info; @@ -1626,6 +1701,9 @@ struct Framework Resources totalOfferedResources; hashmap<SlaveID, Resources> offeredResources; + // This is only set for HTTP frameworks. + Option<process::Owned<Heartbeater>> heartbeater; + private: Framework(const Framework&); // No copying. Framework& operator=(const Framework&); // No assigning. http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/tests/http_api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp index aef3c4b..1d11d29 100644 --- a/src/tests/http_api_tests.cpp +++ b/src/tests/http_api_tests.cpp @@ -21,6 +21,7 @@ #include <mesos/v1/mesos.hpp> #include <mesos/v1/scheduler.hpp> +#include <process/clock.hpp> #include <process/future.hpp> #include <process/gtest.hpp> #include <process/http.hpp> @@ -33,11 +34,13 @@ #include "common/http.hpp" #include "common/recordio.hpp" +#include "master/constants.hpp" #include "master/master.hpp" #include "tests/mesos.hpp" #include "tests/utils.hpp" +using mesos::internal::master::DEFAULT_HEARTBEAT_INTERVAL; using mesos::internal::master::Master; @@ -46,6 +49,7 @@ using mesos::internal::recordio::Reader; using mesos::v1::scheduler::Call; using mesos::v1::scheduler::Event; +using process::Clock; using process::Future; using process::PID; @@ -254,7 +258,8 @@ TEST_P(HttpApiTest, UnsupportedContentMediaType) // This test verifies if the scheduler is able to receive a Subscribed -// event on the stream in response to a Subscribe call request. +// event and heartbeat events on the stream in response to a Subscribe +// call request. TEST_P(HttpApiTest, Subscribe) { // HTTP schedulers cannot yet authenticate. @@ -302,6 +307,23 @@ TEST_P(HttpApiTest, Subscribe) ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); EXPECT_NE("", event.get().get().subscribed().framework_id().value()); + // Make sure it receives a heartbeat. + event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + ASSERT_EQ(Event::HEARTBEAT, event.get().get().type()); + + // Advance the clock to receive another heartbeat. + Clock::pause(); + Clock::advance(DEFAULT_HEARTBEAT_INTERVAL); + + event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + ASSERT_EQ(Event::HEARTBEAT, event.get().get().type()); + Shutdown(); } @@ -387,6 +409,13 @@ TEST_P(HttpApiTest, SubscribedOnRetryWithForce) // Check event type is subscribed and the same framework id is set. ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); EXPECT_EQ(frameworkId, event.get().get().subscribed().framework_id()); + + // Make sure it receives a heartbeat. + event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + ASSERT_EQ(Event::HEARTBEAT, event.get().get().type()); } Shutdown(); @@ -474,6 +503,13 @@ TEST_P(HttpApiTest, UpdatePidToHttpScheduler) EXPECT_EQ(evolve(frameworkId.get()), event.get().get().subscribed().framework_id()); + // Make sure it receives a heartbeat. + event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + ASSERT_EQ(Event::HEARTBEAT, event.get().get().type()); + driver.stop(); driver.join(); @@ -553,6 +589,14 @@ TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce) // was already connected. ASSERT_EQ(Event::ERROR, event.get().get().type()); + // Unsubscribed HTTP framework should not get any heartbeats. + Clock::pause(); + Clock::advance(DEFAULT_HEARTBEAT_INTERVAL); + Clock::settle(); + + event = responseDecoder.read(); + ASSERT_TRUE(event.isPending()); + driver.stop(); driver.join();
