Repository: mesos Updated Branches: refs/heads/master 5c9529777 -> fbf5c7e70
Updated slave to send total amount of oversubscribed resources. Review: https://reviews.apache.org/r/34729 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0df7bb09 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0df7bb09 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0df7bb09 Branch: refs/heads/master Commit: 0df7bb09894235cac0dbf1dfdb0a23d2799d62e9 Parents: 5c95297 Author: Vinod Kone <[email protected]> Authored: Wed May 20 19:10:52 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu May 28 17:11:01 2015 -0700 ---------------------------------------------------------------------- src/messages/messages.proto | 7 +++-- src/slave/flags.cpp | 8 +++--- src/slave/flags.hpp | 2 +- src/slave/slave.cpp | 46 +++++++++++++++++++++---------- src/slave/slave.hpp | 8 ++++-- src/tests/oversubscription_tests.cpp | 14 +++++----- 6 files changed, 53 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 39dac72..1c8d79e 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -334,10 +334,11 @@ message CheckpointResourcesMessage { // This message is sent by the slave to the master to inform the -// master about the currently oversubscribable resources. -message OversubscribeResourcesMessage { +// master about the total amount of oversubscribed (allocated and +// allocatable) resources. +message UpdateSlaveMessage { required SlaveID slave_id = 1; - repeated Resource resources = 2; + repeated Resource oversubscribed_resources = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.cpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index a8c7c49..6b7c61e 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -467,10 +467,10 @@ mesos::internal::slave::Flags::Flags() "resource_estimator", "The name of the resource estimator to use for oversubscription."); - add(&Flags::oversubscribe_resources_interval, - "oversubscribe_resources_interval", + add(&Flags::oversubscribed_resources_interval, + "oversubscribed_resources_interval", "The slave periodically updates the master with the current estimation\n" - "about the maximum amount of resources that can be oversubscribed. The\n" - "interval between updates is controlled by this flag.", + "about the total amount of oversubscribed resources that are allocated\n" + "and available. The interval between updates is controlled by this flag.", Seconds(15)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.hpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 6ca59dc..944ed79 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -101,7 +101,7 @@ public: std::string authenticatee; Option<std::string> hooks; Option<std::string> resource_estimator; - Duration oversubscribe_resources_interval; + Duration oversubscribed_resources_interval; }; } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index b4d2029..fdaaea4 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3982,7 +3982,7 @@ void Slave::__recover(const Future<Nothing>& future) // forward the estimations to the master. resourceEstimator->oversubscribable() .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1)) - .onAny(defer(self(), &Self::forwardOversubscribableResources)); + .onAny(defer(self(), &Self::forwardOversubscribedResources)); // Start detecting masters. detection = detector->detect() @@ -4090,34 +4090,50 @@ void Slave::updateOversubscribableResources(const Future<Resources>& future) } -void Slave::forwardOversubscribableResources() +void Slave::forwardOversubscribedResources() { if (state != RUNNING) { - delay(Seconds(1), self(), &Self::forwardOversubscribableResources); + delay(Seconds(1), self(), &Self::forwardOversubscribedResources); return; } - // We only forward updates after the first estimation is received. - if (oversubscribableResources.isNone()) { - delay(Seconds(1), self(), &Self::forwardOversubscribableResources); - return; + // Calculate the latest allocation of oversubscribed resources. + // Note that this allocation value might be different from the + // master's view because new task/executor might be in flight from + // the master or pending on the slave etc. This is ok because the + // allocator only considers the slave's view of allocation when + // calculating the available oversubscribed resources to offer. + Resources oversubscribed; + foreachvalue (Framework* framework, frameworks) { + foreachvalue (Executor* executor, framework->executors) { + oversubscribed += executor->resources.revocable(); + } } - CHECK_SOME(master); - CHECK_SOME(oversubscribableResources); + // Add oversubscribable resources to the total. + oversubscribed += oversubscribableResources; - LOG(INFO) << "Forwarding oversubscribable resources " - << oversubscribableResources.get(); + if (oversubscribed == oversubscribedResources) { + VLOG(1) << "Not forwarding total oversubscribed resources because the" + << " previous estimate " << oversubscribed << " hasn't changed"; + return; + } + + LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed; - OversubscribeResourcesMessage message; + UpdateSlaveMessage message; message.mutable_slave_id()->CopyFrom(info.id()); - message.mutable_resources()->CopyFrom(oversubscribableResources.get()); + message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed); + CHECK_SOME(master); send(master.get(), message); - delay(flags.oversubscribe_resources_interval, + delay(flags.oversubscribed_resources_interval, self(), - &Self::forwardOversubscribableResources); + &Self::forwardOversubscribedResources); + + // Update the estimate. + oversubscribedResources = oversubscribed; } http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 0207eaf..245ea06 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -434,7 +434,7 @@ private: const Executor* executor); void updateOversubscribableResources(const Future<Resources>& future); - void forwardOversubscribableResources(); + void forwardOversubscribedResources(); const Flags flags; @@ -510,7 +510,11 @@ private: // The most recent estimation about the maximum amount of resources // that can be oversubscribed on the slave. - Option<Resources> oversubscribableResources; + Resources oversubscribableResources; + + // The total amount of oversubscribed (allocated and + // oversubscribable) resources. + Resources oversubscribedResources; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 75c25b0..36a6793 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -48,8 +48,8 @@ class OversubscriptionSlaveTest : public MesosTest {}; // This test verifies that slave will forward the estimation of the -// oversubscribable resources to the master. -TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage) +// oversubscribed resources to the master. +TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage) { Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); @@ -66,13 +66,13 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage) AWAIT_READY(slaveRegistered); - Future<OversubscribeResourcesMessage> update = - FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _); + Future<UpdateSlaveMessage> update = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); Clock::pause(); Clock::settle(); - Clock::advance(flags.oversubscribe_resources_interval); + Clock::advance(flags.oversubscribed_resources_interval); ASSERT_FALSE(update.isReady()); @@ -81,10 +81,10 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage) resourceEstimator.estimate(resources); Clock::settle(); - Clock::advance(flags.oversubscribe_resources_interval); + Clock::advance(flags.oversubscribed_resources_interval); AWAIT_READY(update); - EXPECT_EQ(Resources(update.get().resources()), resources); + EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources); Shutdown(); }
