Repository: mesos Updated Branches: refs/heads/master f15d37ce5 -> e530b4e43
Used the pull model to get estimations from resource estimator. Review: https://reviews.apache.org/r/34559 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e530b4e4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e530b4e4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e530b4e4 Branch: refs/heads/master Commit: e530b4e435d0d9a135701a229f376724fe90f10a Parents: f15d37c Author: Jie Yu <[email protected]> Authored: Thu May 21 12:04:32 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Thu May 21 16:01:36 2015 -0700 ---------------------------------------------------------------------- include/mesos/slave/resource_estimator.hpp | 25 ++++++++------- src/slave/resource_estimator.cpp | 41 ++++++++++++++----------- src/slave/resource_estimator.hpp | 4 +-- src/slave/slave.cpp | 27 ++++++++++------ src/slave/slave.hpp | 2 +- src/tests/mesos.hpp | 14 ++++++--- 6 files changed, 67 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/include/mesos/slave/resource_estimator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp index d64c698..e7f5dec 100644 --- a/include/mesos/slave/resource_estimator.hpp +++ b/include/mesos/slave/resource_estimator.hpp @@ -25,7 +25,6 @@ #include <process/future.hpp> -#include <stout/lambda.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> #include <stout/try.hpp> @@ -46,17 +45,21 @@ public: virtual ~ResourceEstimator() {} - // Initializes this resource estimator. It registers a callback with - // the resource estimator. The callback allows the resource - // estimator to tell the slave about the current estimation of the - // *maximum* amount of resources that can be oversubscribed on the - // slave. A new estimation will invalidate all the previously - // returned estimations. The slave will keep track of the most - // recent estimation and periodically send it to the master. - // + // Initializes this resource estimator. This method needs to be + // called before any other member method is called. // TODO(jieyu): Pass ResourceMonitor* once it's exposed. - virtual Try<Nothing> initialize( - const lambda::function<void(const Resources&)>& oversubscribe) = 0; + virtual Try<Nothing> initialize() = 0; + + // Returns the current estimation about the *maximum* amount of + // resources that can be oversubscribed on the slave. A new + // estimation will invalidate all the previously returned + // estimations. The slave will be calling this method continuously + // to keep track of the most up-to-date estimation and periodically + // forward it to the master. As a result, to avoid overwhelming the + // slave, it is recommended that the resource estimator should + // return an estimation only if the current estimation is + // significantly different from the previous one. + virtual process::Future<Resources> oversubscribable() = 0; }; } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.cpp ---------------------------------------------------------------------- diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp index 7b7b499..a67640c 100644 --- a/src/slave/resource_estimator.cpp +++ b/src/slave/resource_estimator.cpp @@ -16,7 +16,6 @@ * limitations under the License. */ -#include <process/delay.hpp> #include <process/dispatch.hpp> #include <process/process.hpp> @@ -53,25 +52,20 @@ class NoopResourceEstimatorProcess : public Process<NoopResourceEstimatorProcess> { public: - NoopResourceEstimatorProcess( - const lambda::function<void(const Resources&)>& _oversubscribe) - : oversubscribe(_oversubscribe) {} + NoopResourceEstimatorProcess() : sent(false) {} -protected: - virtual void initialize() + Future<Resources> oversubscribable() { - notify(); - } - - // Periodically notify the slave about oversubscribable resources. - void notify() - { - oversubscribe(Resources()); + if (!sent) { + sent = true; + return Resources(); + } - delay(Seconds(1), self(), &Self::notify); + return Future<Resources>(); } - const lambda::function<void(const Resources&)> oversubscribe; +private: + bool sent; }; @@ -84,19 +78,30 @@ NoopResourceEstimator::~NoopResourceEstimator() } -Try<Nothing> NoopResourceEstimator::initialize( - const lambda::function<void(const Resources&)>& oversubscribe) +Try<Nothing> NoopResourceEstimator::initialize() { if (process.get() != NULL) { return Error("Noop resource estimator has already been initialized"); } - process.reset(new NoopResourceEstimatorProcess(oversubscribe)); + process.reset(new NoopResourceEstimatorProcess()); spawn(process.get()); return Nothing(); } + +Future<Resources> NoopResourceEstimator::oversubscribable() +{ + if (process.get() == NULL) { + return Failure("Noop resource estimator is not initialized"); + } + + return dispatch( + process.get(), + &NoopResourceEstimatorProcess::oversubscribable); +} + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.hpp ---------------------------------------------------------------------- diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp index 5a6367c..717804d 100644 --- a/src/slave/resource_estimator.hpp +++ b/src/slave/resource_estimator.hpp @@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator public: virtual ~NoopResourceEstimator(); - virtual Try<Nothing> initialize( - const lambda::function<void(const Resources&)>& oversubscribe); + virtual Try<Nothing> initialize(); + virtual process::Future<Resources> oversubscribable(); protected: process::Owned<NoopResourceEstimatorProcess> process; http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 8e88482..b4d2029 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -324,9 +324,7 @@ void Slave::initialize() } // TODO(jieyu): Pass ResourceMonitor* to 'initialize'. - Try<Nothing> initialize = resourceEstimator->initialize( - defer(self(), &Self::updateOversubscribableResources, lambda::_1)); - + Try<Nothing> initialize = resourceEstimator->initialize(); if (initialize.isError()) { EXIT(1) << "Failed to initialize the resource estimator: " << initialize.error(); @@ -3980,8 +3978,11 @@ void Slave::__recover(const Future<Nothing>& future) if (flags.recover == "reconnect") { state = DISCONNECTED; - // Start to send updates about oversubscribable resources. - forwardOversubscribableResources(); + // Start to get estimations from the resource estimator and + // forward the estimations to the master. + resourceEstimator->oversubscribable() + .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1)) + .onAny(defer(self(), &Self::forwardOversubscribableResources)); // Start detecting masters. detection = detector->detect() @@ -4072,12 +4073,20 @@ Future<Nothing> Slave::garbageCollect(const string& path) } -void Slave::updateOversubscribableResources(const Resources& resources) +void Slave::updateOversubscribableResources(const Future<Resources>& future) { - LOG(INFO) << "Received a new estimation of the oversubscribable " - << "resources " << resources; + if (!future.isReady()) { + LOG(ERROR) << "Failed to estimate oversubscribable resources: " + << (future.isFailed() ? future.failure() : "discarded"); + } else { + LOG(INFO) << "Received a new estimation of the oversubscribable " + << "resources " << future.get(); + + oversubscribableResources = future.get(); + } - oversubscribableResources = resources; + resourceEstimator->oversubscribable() + .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index d82b10c..0207eaf 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -433,7 +433,7 @@ private: const FrameworkID& frameworkId, const Executor* executor); - void updateOversubscribableResources(const Resources& resources); + void updateOversubscribableResources(const Future<Resources>& future); void forwardOversubscribableResources(); const Flags flags; http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index a60df75..924b0ff 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -38,6 +38,7 @@ #include <process/owned.hpp> #include <process/pid.hpp> #include <process/process.hpp> +#include <process/queue.hpp> #include <stout/bytes.hpp> #include <stout/foreach.hpp> @@ -704,20 +705,23 @@ public: class TestResourceEstimator : public mesos::slave::ResourceEstimator { public: - virtual Try<Nothing> initialize( - const lambda::function<void(const Resources&)>& _oversubscribe) + virtual Try<Nothing> initialize() { - oversubscribe = _oversubscribe; return Nothing(); } + virtual process::Future<Resources> oversubscribable() + { + return queue.get(); + } + void estimate(const Resources& resources) { - oversubscribe(resources); + queue.put(resources); } private: - lambda::function<void(const Resources&)> oversubscribe; + process::Queue<Resources> queue; };
