Implemented 'updateSlave()' call in the master. Review: https://reviews.apache.org/r/34736
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbf5c7e7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbf5c7e7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbf5c7e7 Branch: refs/heads/master Commit: fbf5c7e703c691f8b8bcf20ea7c324e9987beab1 Parents: 949e6ad Author: Vinod Kone <[email protected]> Authored: Wed May 27 16:07:59 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu May 28 17:11:04 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 53 ++++++++- src/tests/oversubscription_tests.cpp | 172 +++++++++++++++++++++++++++++- 2 files changed, 219 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d61b77b..710b814 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3463,11 +3463,52 @@ void Master::updateSlave( { ++metrics->messages_update_slave; - LOG(INFO) << "Received update of slave " << slaveId - << " with oversubscribed resources " << oversubscribedResources; + if (slaves.removed.get(slaveId).isSome()) { + // If the slave is removed, we have already informed + // frameworks that its tasks were LOST, so the slave should + // shut down. + LOG(WARNING) + << "Ignoring update of slave with total oversubscribed resources " + << oversubscribedResources << " on removed slave " << slaveId + << " ; asking slave to shutdown"; + + ShutdownMessage message; + message.set_message("Update slave message from unknown slave"); + reply(message); + return; + } + + if (!slaves.registered.contains(slaveId)) { + LOG(WARNING) + << "Ignoring update of slave with total oversubscribed resources " + << oversubscribedResources << " on unknown slave " << slaveId; + return; + } + + Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId)); + + LOG(INFO) << "Received update of slave " << *slave << " with total" + << " oversubscribed resources " << oversubscribedResources; + + // First, rescind any oustanding offers with revocable resources. + // NOTE: Need a copy of offers because the offers are removed inside + // the loop. + foreach (Offer* offer, utils::copy(slave->offers)) { + const Resources offered = offer->resources(); + if (!offered.revocable().empty()) { + LOG(INFO) << "Removing offer " << offer->id() + << " with revocable resources " << offered + << " on slave " << *slave; + + allocator->recoverResources( + offer->framework_id(), offer->slave_id(), offer->resources(), None()); + + removeOffer(offer, true); // Rescind. + } + } - // TODO(vinod): Rescind any oustanding revocable offers from this - // slave and update the allocator. + // Now, update the allocator with the new estimate. + allocator->updateSlave(slaveId, oversubscribedResources); } @@ -3984,6 +4025,10 @@ void Master::offer(const FrameworkID& frameworkId, } #endif // WITH_NETWORK_ISOLATOR + // TODO(vinod): Split regular and revocable resources into + // separate offers, so that rescinding offers with revocable + // resources does not affect offers with regular resources. + Offer* offer = new Offer(); offer->mutable_id()->MergeFrom(newOfferId()); offer->mutable_framework_id()->MergeFrom(framework->id()); http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 1dda63e..ea5857c 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -16,15 +16,21 @@ * limitations under the License. */ +#include <string> +#include <vector> + #include <gmock/gmock.h> #include <mesos/resources.hpp> #include <process/clock.hpp> +#include <process/future.hpp> #include <process/gtest.hpp> #include <stout/gtest.hpp> +#include "common/resources_utils.hpp" + #include "master/master.hpp" #include "messages/messages.hpp" @@ -41,11 +47,28 @@ using mesos::internal::master::Master; using mesos::internal::slave::Slave; +using std::string; +using std::vector; + namespace mesos { namespace internal { namespace tests { -class OversubscriptionTest : public MesosTest {}; +class OversubscriptionTest : public MesosTest +{ +protected: + // TODO(vinod): Make this a global helper that other tests (e.g., + // hierarchical allocator tests) can use. + Resources createRevocableResources( + const string& name, + const string& value, + const string& role = "*") + { + Resource resource = Resources::parse(name, value, role).get(); + resource.mutable_revocable(); + return resource; + } +}; // This test verifies that slave will forward the estimation of the @@ -79,7 +102,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage) ASSERT_FALSE(update.isReady()); // Inject an estimation of oversubscribable resources. - Resources resources = Resources::parse("cpus:1;mem:32").get(); + Resources resources = createRevocableResources("cpus", "1"); resourceEstimator.estimate(resources); AWAIT_READY(update); @@ -97,6 +120,151 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage) Shutdown(); } + +// This test verifies that a framework that desires revocable +// resources gets an offer with revocable resources. +TEST_F(OversubscriptionTest, RevocableOffer) +{ + // Start the master. + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Start the slave with test resource estimator. + TestResourceEstimator resourceEstimator; + slave::Flags flags = CreateSlaveFlags(); + + Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags); + ASSERT_SOME(slave); + + // Start the framework which desires revocable resources. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::REVOCABLE_RESOURCES); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers1; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers1)); + + driver.start(); + + // Initially the framework will get all regular resources. + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); + EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty()); + + Future<vector<Offer>> offers2; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Inject an estimation of oversubscribable resources. + Resources resources = createRevocableResources("cpus", "1"); + resourceEstimator.estimate(resources); + + // Now the framework will get revocable resources. + AWAIT_READY(offers2); + EXPECT_NE(0u, offers2.get().size()); + EXPECT_EQ(resources, Resources(offers2.get()[0].resources())); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This test verifies that when the master receives a new estimate for +// oversubscribed resources it rescinds outstanding revocable offers. +TEST_F(OversubscriptionTest, RescindRevocableOffer) +{ + // Start the master. + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Start the slave with test resource estimator. + TestResourceEstimator resourceEstimator; + slave::Flags flags = CreateSlaveFlags(); + + Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags); + ASSERT_SOME(slave); + + // Start the framework which desires revocable resources. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::REVOCABLE_RESOURCES); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers1; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers1)); + + driver.start(); + + // Initially the framework will get all regular resources. + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); + EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty()); + + Future<vector<Offer>> offers2; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers2)); + + // Inject an estimation of oversubscribable resources. + Resources resources = createRevocableResources("cpus", "1"); + resourceEstimator.estimate(resources); + + // Now the framework will get revocable resources. + AWAIT_READY(offers2); + EXPECT_NE(0u, offers2.get().size()); + EXPECT_EQ(resources, Resources(offers2.get()[0].resources())); + + Future<OfferID> offerId; + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .WillOnce(FutureArg<1>(&offerId)); + + Future<vector<Offer>> offers3; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers3)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Inject another estimation of oversubscribable resources while the + // previous revocable offer is oustanding. + Resources resources2 = createRevocableResources("cpus", "2"); + resourceEstimator.estimate(resources2); + + // Advance the clock for the slave to send the new estimate. + Clock::pause(); + Clock::advance(flags.oversubscribed_resources_interval); + Clock::settle(); + + // The previous revocable offer should be rescinded. + AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId); + + // Resume the clock for next allocation. + Clock::resume(); + + // The new offer should include the latest oversubscribed resources. + AWAIT_READY(offers3); + EXPECT_NE(0u, offers3.get().size()); + EXPECT_EQ(resources2, Resources(offers3.get()[0].resources())); + + driver.stop(); + driver.join(); + + Shutdown(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
