Repository: mesos Updated Branches: refs/heads/master 2c320eee9 -> ac7fb6324
Changed to use a push model for resource estimator. Review: https://reviews.apache.org/r/34299 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac7fb632 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac7fb632 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac7fb632 Branch: refs/heads/master Commit: ac7fb6324db60a3cfd417ad87ca4bb9a68457688 Parents: 2c320ee Author: Jie Yu <[email protected]> Authored: Fri May 15 16:35:32 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Mon May 18 12:24:53 2015 -0700 ---------------------------------------------------------------------- include/mesos/slave/resource_estimator.hpp | 27 +++++------ src/messages/messages.proto | 4 +- src/slave/constants.cpp | 6 --- src/slave/constants.hpp | 5 --- src/slave/flags.cpp | 7 +++ src/slave/flags.hpp | 1 + src/slave/resource_estimator.cpp | 37 ++++++++------- src/slave/resource_estimator.hpp | 4 +- src/slave/slave.cpp | 60 ++++++++++++------------- src/slave/slave.hpp | 10 +++-- src/tests/mesos.hpp | 31 ++++++------- src/tests/oversubscription_tests.cpp | 40 +++++++++++++---- 12 files changed, 125 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/include/mesos/slave/resource_estimator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp index 3639615..d64c698 100644 --- a/include/mesos/slave/resource_estimator.hpp +++ b/include/mesos/slave/resource_estimator.hpp @@ -25,7 +25,7 @@ #include <process/future.hpp> -#include <stout/none.hpp> +#include <stout/lambda.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> #include <stout/try.hpp> @@ -46,22 +46,17 @@ public: virtual ~ResourceEstimator() {} - // Initializes this resource estimator. This method needs to be - // called before any other member method is called. + // Initializes this resource estimator. It registers a callback with + // the resource estimator. The callback allows the resource + // estimator to tell the slave about the current estimation of the + // *maximum* amount of resources that can be oversubscribed on the + // slave. A new estimation will invalidate all the previously + // returned estimations. The slave will keep track of the most + // recent estimation and periodically send it to the master. + // // TODO(jieyu): Pass ResourceMonitor* once it's exposed. - virtual Try<Nothing> initialize() = 0; - - // Returns the current estimation about the *maximum* amount of - // resources that can be oversubscribed on the slave. A new - // estimation will invalidate all the previously returned - // estimations. The slave will be calling this method continuously - // to get the most up-to-date estimation and forward them to the - // master. As a result, it is up to the resource estimator to - // control the speed of sending estimations to the master. To avoid - // overwhelming the master, it is recommended that the resource - // estimator should return an estimation only if the current - // estimation is significantly different from the previous one. - virtual process::Future<Resources> oversubscribed() = 0; + virtual Try<Nothing> initialize( + const lambda::function<void(const Resources&)>& oversubscribe) = 0; }; } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 19e2444..c946754 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -334,8 +334,8 @@ message CheckpointResourcesMessage { // This message is sent by the slave to the master to inform the -// master about the currently available oversubscribed resources. -message UpdateOversubscribedResourcesMessage { +// master about the currently oversubscribable resources. +message OversubscribeResourcesMessage { required SlaveID slave_id = 1; repeated Resource resources = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.cpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp index 07f699a..2a99b11 100644 --- a/src/slave/constants.cpp +++ b/src/slave/constants.cpp @@ -57,12 +57,6 @@ Duration MASTER_PING_TIMEOUT() return master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS; } - -Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN() -{ - return Seconds(5); -} - } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index df02043..fd1c1ab 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -104,11 +104,6 @@ extern const std::string DEFAULT_AUTHENTICATEE; // trigger a re-detection of the master to cause a re-registration. Duration MASTER_PING_TIMEOUT(); - -// To avoid overwhelming the master, we enforce a minimal delay -// between two subsequent UpdateOversubscribedResourcesMessages. -Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(); - } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.cpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index da30973..b5e2518 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -440,4 +440,11 @@ mesos::internal::slave::Flags::Flags() add(&Flags::resource_estimator, "resource_estimator", "The name of the resource estimator to use for oversubscription."); + + add(&Flags::oversubscribe_resources_interval, + "oversubscribe_resources_interval", + "The slave periodically updates the master with the current estimation\n" + "about the maximum amount of resources that can be oversubscribed. The\n" + "interval between updates is controlled by this flag.", + Seconds(15)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.hpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index ca7cc13..5c57478 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -98,6 +98,7 @@ public: std::string authenticatee; Option<std::string> hooks; Option<std::string> resource_estimator; + Duration oversubscribe_resources_interval; }; } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.cpp ---------------------------------------------------------------------- diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp index 13d706c..7b7b499 100644 --- a/src/slave/resource_estimator.cpp +++ b/src/slave/resource_estimator.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#include <process/delay.hpp> #include <process/dispatch.hpp> #include <process/process.hpp> @@ -52,10 +53,25 @@ class NoopResourceEstimatorProcess : public Process<NoopResourceEstimatorProcess> { public: - Future<Resources> oversubscribed() + NoopResourceEstimatorProcess( + const lambda::function<void(const Resources&)>& _oversubscribe) + : oversubscribe(_oversubscribe) {} + +protected: + virtual void initialize() + { + notify(); + } + + // Periodically notify the slave about oversubscribable resources. + void notify() { - return Resources(); + oversubscribe(Resources()); + + delay(Seconds(1), self(), &Self::notify); } + + const lambda::function<void(const Resources&)> oversubscribe; }; @@ -68,30 +84,19 @@ NoopResourceEstimator::~NoopResourceEstimator() } -Try<Nothing> NoopResourceEstimator::initialize() +Try<Nothing> NoopResourceEstimator::initialize( + const lambda::function<void(const Resources&)>& oversubscribe) { if (process.get() != NULL) { return Error("Noop resource estimator has already been initialized"); } - process.reset(new NoopResourceEstimatorProcess()); + process.reset(new NoopResourceEstimatorProcess(oversubscribe)); spawn(process.get()); return Nothing(); } - -Future<Resources> NoopResourceEstimator::oversubscribed() -{ - if (process.get() == NULL) { - return Failure("Noop resource estimator is not initialized"); - } - - return dispatch( - process.get(), - &NoopResourceEstimatorProcess::oversubscribed); -} - } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.hpp ---------------------------------------------------------------------- diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp index bdf62ba..5a6367c 100644 --- a/src/slave/resource_estimator.hpp +++ b/src/slave/resource_estimator.hpp @@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator public: virtual ~NoopResourceEstimator(); - virtual Try<Nothing> initialize(); - virtual process::Future<Resources> oversubscribed(); + virtual Try<Nothing> initialize( + const lambda::function<void(const Resources&)>& oversubscribe); protected: process::Owned<NoopResourceEstimatorProcess> process; http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 132f83e..8e88482 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -324,7 +324,9 @@ void Slave::initialize() } // TODO(jieyu): Pass ResourceMonitor* to 'initialize'. - Try<Nothing> initialize = resourceEstimator->initialize(); + Try<Nothing> initialize = resourceEstimator->initialize( + defer(self(), &Self::updateOversubscribableResources, lambda::_1)); + if (initialize.isError()) { EXIT(1) << "Failed to initialize the resource estimator: " << initialize.error(); @@ -3978,8 +3980,8 @@ void Slave::__recover(const Future<Nothing>& future) if (flags.recover == "reconnect") { state = DISCONNECTED; - // Start to detect available oversubscribed resources. - updateOversubscribedResources(); + // Start to send updates about oversubscribable resources. + forwardOversubscribableResources(); // Start detecting masters. detection = detector->detect() @@ -4070,45 +4072,43 @@ Future<Nothing> Slave::garbageCollect(const string& path) } -void Slave::updateOversubscribedResources() +void Slave::updateOversubscribableResources(const Resources& resources) { - // TODO(jieyu): Consider switching to a push model in which the - // slave registers a callback with the resource estimator, and the - // resource estimator invokes the callback whenever a new estimation - // is ready (similar to the allocator/master interface). + LOG(INFO) << "Received a new estimation of the oversubscribable " + << "resources " << resources; + + oversubscribableResources = resources; +} + +void Slave::forwardOversubscribableResources() +{ if (state != RUNNING) { - delay(Seconds(1), self(), &Self::updateOversubscribedResources); + delay(Seconds(1), self(), &Self::forwardOversubscribableResources); return; } - resourceEstimator->oversubscribed() - .onAny(defer(self(), &Slave::_updateOversubscribedResources, lambda::_1)); -} - + // We only forward updates after the first estimation is received. + if (oversubscribableResources.isNone()) { + delay(Seconds(1), self(), &Self::forwardOversubscribableResources); + return; + } -void Slave::_updateOversubscribedResources(const Future<Resources>& future) -{ - if (!future.isReady()) { - LOG(ERROR) << "Failed to estimate oversubscribed resources: " - << (future.isFailed() ? future.failure() : "discarded"); - } else if (state == RUNNING) { - CHECK_SOME(master); + CHECK_SOME(master); + CHECK_SOME(oversubscribableResources); - LOG(INFO) << "Updating available oversubscribed resources to " - << future.get(); + LOG(INFO) << "Forwarding oversubscribable resources " + << oversubscribableResources.get(); - UpdateOversubscribedResourcesMessage message; - message.mutable_slave_id()->CopyFrom(info.id()); - message.mutable_resources()->CopyFrom(future.get()); + OversubscribeResourcesMessage message; + message.mutable_slave_id()->CopyFrom(info.id()); + message.mutable_resources()->CopyFrom(oversubscribableResources.get()); - send(master.get(), message); - } + send(master.get(), message); - // TODO(jieyu): Consider making the interval configurable. - delay(UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(), + delay(flags.oversubscribe_resources_interval, self(), - &Self::updateOversubscribedResources); + &Self::forwardOversubscribableResources); } http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index b62ed7b..d82b10c 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -433,10 +433,8 @@ private: const FrameworkID& frameworkId, const Executor* executor); - // Polls oversubscribed resources estimations from resources - // estimator and forwards estimations to the master. - void updateOversubscribedResources(); - void _updateOversubscribedResources(const process::Future<Resources>& future); + void updateOversubscribableResources(const Resources& resources); + void forwardOversubscribableResources(); const Flags flags; @@ -509,6 +507,10 @@ private: Duration executorDirectoryMaxAllowedAge; mesos::slave::ResourceEstimator* resourceEstimator; + + // The most recent estimation about the maximum amount of resources + // that can be oversubscribed on the slave. + Option<Resources> oversubscribableResources; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index df8cd20..a60df75 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -30,6 +30,8 @@ #include <mesos/master/allocator.hpp> +#include <mesos/slave/resource_estimator.hpp> + #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> @@ -699,28 +701,23 @@ public: }; -class MockResourceEstimator : public mesos::slave::ResourceEstimator +class TestResourceEstimator : public mesos::slave::ResourceEstimator { public: - MockResourceEstimator() + virtual Try<Nothing> initialize( + const lambda::function<void(const Resources&)>& _oversubscribe) { - // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of - // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()' - // for more details. - EXPECT_CALL(*this, initialize()) - .WillRepeatedly(Return(Nothing())); - - EXPECT_CALL(*this, oversubscribed()) - .WillRepeatedly(Return(Resources())); + oversubscribe = _oversubscribe; + return Nothing(); } - MOCK_METHOD0( - initialize, - Try<Nothing>()); + void estimate(const Resources& resources) + { + oversubscribe(resources); + } - MOCK_METHOD0( - oversubscribed, - process::Future<Resources>()); +private: + lambda::function<void(const Resources&)> oversubscribe; }; @@ -787,7 +784,7 @@ public: private: Files files; MockGarbageCollector gc; - MockResourceEstimator resourceEstimator; + TestResourceEstimator resourceEstimator; slave::StatusUpdateManager* statusUpdateManager; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 64c2ede..75c25b0 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -18,6 +18,9 @@ #include <gmock/gmock.h> +#include <mesos/resources.hpp> + +#include <process/clock.hpp> #include <process/gtest.hpp> #include <stout/gtest.hpp> @@ -45,24 +48,43 @@ class OversubscriptionSlaveTest : public MesosTest {}; // This test verifies that slave will forward the estimation of the -// available oversubscribed resources to the master. -TEST_F(OversubscriptionSlaveTest, UpdateOversubcribedResourcesMessage) +// oversubscribable resources to the master. +TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage) { Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Future<UpdateOversubscribedResourcesMessage> message = - FUTURE_PROTOBUF(UpdateOversubscribedResourcesMessage(), _, _); + Future<SlaveRegisteredMessage> slaveRegistered = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - MockResourceEstimator resourceEstimator; + TestResourceEstimator resourceEstimator; - EXPECT_CALL(resourceEstimator, oversubscribed()) - .WillRepeatedly(Return(Resources())); + slave::Flags flags = CreateSlaveFlags(); - Try<PID<Slave>> slave = StartSlave(&resourceEstimator); + Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags); ASSERT_SOME(slave); - AWAIT_READY(message); + AWAIT_READY(slaveRegistered); + + Future<OversubscribeResourcesMessage> update = + FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _); + + Clock::pause(); + + Clock::settle(); + Clock::advance(flags.oversubscribe_resources_interval); + + ASSERT_FALSE(update.isReady()); + + // Inject an estimation of oversubscribable resources. + Resources resources = Resources::parse("cpus:1;mem:32").get(); + resourceEstimator.estimate(resources); + + Clock::settle(); + Clock::advance(flags.oversubscribe_resources_interval); + + AWAIT_READY(update); + EXPECT_EQ(Resources(update.get().resources()), resources); Shutdown(); }
