Added 'updateSlave()' in master to handle oversubscribed resources. Review: https://reviews.apache.org/r/34730
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/949e6ad1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/949e6ad1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/949e6ad1 Branch: refs/heads/master Commit: 949e6ad1c6e24e3446c44519af28dd5f32e3c486 Parents: 0df7bb0 Author: Vinod Kone <[email protected]> Authored: Wed May 20 19:11:44 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu May 28 17:11:04 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 19 +++++++++++++++++++ src/master/master.hpp | 4 ++++ src/master/metrics.cpp | 4 ++++ src/master/metrics.hpp | 1 + src/tests/oversubscription_tests.cpp | 20 ++++++++++++++------ 5 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 1526f59..d61b77b 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -718,6 +718,11 @@ void Master::initialize() &ExitedExecutorMessage::executor_id, &ExitedExecutorMessage::status); + install<UpdateSlaveMessage>( + &Master::updateSlave, + &UpdateSlaveMessage::slave_id, + &UpdateSlaveMessage::oversubscribed_resources); + install<AuthenticateMessage>( &Master::authenticate, &AuthenticateMessage::pid); @@ -3452,6 +3457,20 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) } +void Master::updateSlave( + const SlaveID& slaveId, + const vector<Resource>& oversubscribedResources) +{ + ++metrics->messages_update_slave; + + LOG(INFO) << "Received update of slave " << slaveId + << " with oversubscribed resources " << oversubscribedResources; + + // TODO(vinod): Rescind any oustanding revocable offers from this + // slave and update the allocator. +} + + // TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' // because the status updates will be sent by the slave. void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index c8c6251..c0cc293 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -750,6 +750,10 @@ public: const ExecutorID& executorId, int32_t status); + void updateSlave( + const SlaveID& slaveId, + const std::vector<Resource>& oversubscribedResources); + void shutdownSlave( const SlaveID& slaveId, const std::string& message); http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.cpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp index ee09664..264252c 100644 --- a/src/master/metrics.cpp +++ b/src/master/metrics.cpp @@ -117,6 +117,8 @@ Metrics::Metrics(const Master& master) "master/messages_status_update"), messages_exited_executor( "master/messages_exited_executor"), + messages_update_slave( + "master/messages_update_slave"), messages_authenticate( "master/messages_authenticate"), valid_framework_to_executor_messages( @@ -208,6 +210,7 @@ Metrics::Metrics(const Master& master) process::metrics::add(messages_unregister_slave); process::metrics::add(messages_status_update); process::metrics::add(messages_exited_executor); + process::metrics::add(messages_update_slave); // Messages from both schedulers and slaves. process::metrics::add(messages_authenticate); @@ -314,6 +317,7 @@ Metrics::~Metrics() process::metrics::remove(messages_unregister_slave); process::metrics::remove(messages_status_update); process::metrics::remove(messages_exited_executor); + process::metrics::remove(messages_update_slave); // Messages from both schedulers and slaves. process::metrics::remove(messages_authenticate); http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.hpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp index 78d0666..833033c 100644 --- a/src/master/metrics.hpp +++ b/src/master/metrics.hpp @@ -138,6 +138,7 @@ struct Metrics process::metrics::Counter messages_unregister_slave; process::metrics::Counter messages_status_update; process::metrics::Counter messages_exited_executor; + process::metrics::Counter messages_update_slave; // Messages from both schedulers and slaves. process::metrics::Counter messages_authenticate; http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 36a6793..1dda63e 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -33,6 +33,7 @@ #include "slave/slave.hpp" #include "tests/mesos.hpp" +#include "tests/utils.hpp" using namespace process; @@ -44,12 +45,12 @@ namespace mesos { namespace internal { namespace tests { -class OversubscriptionSlaveTest : public MesosTest {}; +class OversubscriptionTest : public MesosTest {}; // This test verifies that slave will forward the estimation of the // oversubscribed resources to the master. -TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage) +TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage) { Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); @@ -71,8 +72,9 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage) Clock::pause(); - Clock::settle(); + // No update should be sent until there is an estimate. Clock::advance(flags.oversubscribed_resources_interval); + Clock::settle(); ASSERT_FALSE(update.isReady()); @@ -80,12 +82,18 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage) Resources resources = Resources::parse("cpus:1;mem:32").get(); resourceEstimator.estimate(resources); - Clock::settle(); - Clock::advance(flags.oversubscribed_resources_interval); - AWAIT_READY(update); EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources); + // Ensure the metric is updated. + JSON::Object metrics = Metrics(); + ASSERT_EQ( + 1u, + metrics.values.count("master/messages_update_slave")); + ASSERT_EQ( + 1u, + metrics.values["master/messages_update_slave"]); + Shutdown(); }
