Made the GarbageCollector injectable into the Slave. Review: https://reviews.apache.org/r/25372
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1071d3e3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1071d3e3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1071d3e3 Branch: refs/heads/master Commit: 1071d3e39ce558aaf541475f8786e4a7003752e0 Parents: 00024c3 Author: Benjamin Mahler <[email protected]> Authored: Thu Sep 4 10:23:51 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Sep 10 11:05:35 2014 -0700 ---------------------------------------------------------------------- src/local/local.cpp | 21 +- src/slave/gc.hpp | 10 +- src/slave/main.cpp | 6 +- src/slave/slave.cpp | 10 +- src/slave/slave.hpp | 5 +- src/tests/cluster.hpp | 311 +++++++------------------- src/tests/fault_tolerance_tests.cpp | 1 + src/tests/master_authorization_tests.cpp | 66 +++--- src/tests/mesos.cpp | 37 ++- src/tests/mesos.hpp | 58 +++++ src/tests/reconciliation_tests.cpp | 6 +- 11 files changed, 246 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/local/local.cpp ---------------------------------------------------------------------- diff --git a/src/local/local.cpp b/src/local/local.cpp index 9db4dd1..b8a481d 100644 --- a/src/local/local.cpp +++ b/src/local/local.cpp @@ -50,6 +50,7 @@ #include "master/repairer.hpp" #include "slave/containerizer/containerizer.hpp" +#include "slave/gc.hpp" #include "slave/slave.hpp" #include "state/in_memory.hpp" @@ -70,6 +71,7 @@ using mesos::internal::master::Registrar; using mesos::internal::master::Repairer; using mesos::internal::slave::Containerizer; +using mesos::internal::slave::GarbageCollector; using mesos::internal::slave::Slave; using process::Owned; @@ -100,6 +102,7 @@ static StandaloneMasterDetector* detector = NULL; static MasterContender* contender = NULL; static Option<Authorizer*> authorizer = None(); static Files* files = NULL; +static vector<GarbageCollector*>* garbageCollectors = NULL; PID<Master> launch(const Flags& flags, Allocator* _allocator) @@ -193,6 +196,8 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) PID<Master> pid = process::spawn(master); + garbageCollectors = new vector<GarbageCollector*>(); + vector<UPID> pids; for (int i = 0; i < flags.num_slaves; i++) { @@ -204,6 +209,8 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) << "slave flags from the environment: " << load.error(); } + garbageCollectors->push_back(new GarbageCollector()); + Try<Containerizer*> containerizer = Containerizer::create(flags, true); if (containerizer.isError()) { EXIT(1) << "Failed to create a containerizer: " << containerizer.error(); @@ -214,7 +221,12 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) // NOTE: At this point detector is already initialized by the // Master. - Slave* slave = new Slave(flags, detector, containerizer.get(), files); + Slave* slave = new Slave( + flags, + detector, + containerizer.get(), + files, + garbageCollectors->back()); slaves[containerizer.get()] = slave; pids.push_back(process::spawn(slave)); } @@ -262,6 +274,13 @@ void shutdown() delete files; files = NULL; + foreach (GarbageCollector* gc, *garbageCollectors) { + delete gc; + } + + delete garbageCollectors; + garbageCollectors = NULL; + delete registrar; registrar = NULL; http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/gc.hpp ---------------------------------------------------------------------- diff --git a/src/slave/gc.hpp b/src/slave/gc.hpp index 8f9398b..780f9c9 100644 --- a/src/slave/gc.hpp +++ b/src/slave/gc.hpp @@ -52,7 +52,7 @@ class GarbageCollector { public: GarbageCollector(); - ~GarbageCollector(); + virtual ~GarbageCollector(); // Schedules the specified path for removal after the specified // duration of time has elapsed. If the path is already scheduled, @@ -64,18 +64,20 @@ public: // was rescheduled. // Note that you currently cannot discard a returned future, instead // you must call unschedule. - process::Future<Nothing> schedule(const Duration& d, const std::string& path); + virtual process::Future<Nothing> schedule( + const Duration& d, + const std::string& path); // Unschedules the specified path for removal. // The future will be true if the path has been unscheduled. // The future will be false if the path is not scheduled for // removal, or the path has already being removed. // Note that you currently cannot discard a returned future. - process::Future<bool> unschedule(const std::string& path); + virtual process::Future<bool> unschedule(const std::string& path); // Deletes all the directories, whose scheduled garbage collection time // is within the next 'd' duration of time. - void prune(const Duration& d); + virtual void prune(const Duration& d); private: GarbageCollectorProcess* process; http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/main.cpp ---------------------------------------------------------------------- diff --git a/src/slave/main.cpp b/src/slave/main.cpp index 319316f..2c4d365 100644 --- a/src/slave/main.cpp +++ b/src/slave/main.cpp @@ -33,6 +33,7 @@ #include "logging/logging.hpp" +#include "slave/gc.hpp" #include "slave/slave.hpp" using namespace mesos::internal; @@ -149,7 +150,10 @@ int main(int argc, char** argv) LOG(INFO) << "Starting Mesos slave"; Files files; - Slave* slave = new Slave(flags, detector.get(), containerizer.get(), &files); + GarbageCollector gc; + + Slave* slave = + new Slave(flags, detector.get(), containerizer.get(), &files, &gc); process::spawn(slave); process::wait(slave->self()); http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 9536a3b..f1df9d1 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -103,7 +103,8 @@ using namespace state; Slave::Slave(const slave::Flags& _flags, MasterDetector* _detector, Containerizer* _containerizer, - Files* _files) + Files* _files, + GarbageCollector* _gc) : ProcessBase(process::ID::generate("slave")), state(RECOVERING), http(this), @@ -113,6 +114,7 @@ Slave::Slave(const slave::Flags& _flags, containerizer(_containerizer), files(_files), metrics(*this), + gc(_gc), monitor(containerizer), statusUpdateManager(new StatusUpdateManager()), metaDir(paths::getMetaRootDir(flags.work_dir)), @@ -1000,7 +1002,7 @@ void Slave::doReliableRegistration(const Duration& duration) // TODO(vinod): Can we avoid this helper? Future<bool> Slave::unschedule(const string& path) { - return gc.unschedule(path); + return gc->unschedule(path); } @@ -3077,7 +3079,7 @@ void Slave::_checkDiskUsage(const Future<double>& usage) // the next 'gc_delay - age'. Since a directory is always // scheduled for deletion 'gc_delay' into the future, only directories // that are at least 'age' old are deleted. - gc.prune(flags.gc_delay - age(usage.get())); + gc->prune(flags.gc_delay - age(usage.get())); } delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage); } @@ -3335,7 +3337,7 @@ Future<Nothing> Slave::garbageCollect(const string& path) // GC based on the modification time. Duration delay = flags.gc_delay - (Clock::now() - time.get()); - return gc.schedule(delay, path); + return gc->schedule(delay, path); } http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index d8c7ee4..a418536 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -89,7 +89,8 @@ public: Slave(const Flags& flags, MasterDetector* detector, Containerizer* containerizer, - Files* files); + Files* files, + GarbageCollector* gc); virtual ~Slave(); @@ -438,7 +439,7 @@ private: process::Time startTime; - GarbageCollector gc; + GarbageCollector* gc; ResourceMonitor monitor; StatusUpdateManager* statusUpdateManager; http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index d857fc6..aad4275 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -62,6 +62,7 @@ #include "master/repairer.hpp" #include "slave/flags.hpp" +#include "slave/gc.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/slave.hpp" @@ -92,33 +93,11 @@ public: void shutdown(); - // Start and manage a new master using the specified flags. - // This overload is shorthand to specify that you want default - // master objects and is equivalent to passing None to all of - // the required arguments of the other overload. + // Start a new master with the provided flags and injections. Try<process::PID<master::Master> > start( - const master::Flags& flags = master::Flags()); - - // Start and manage a new master using the specified allocator - // process. - Try<process::PID<master::Master> > start( - master::allocator::AllocatorProcess* allocatorProcess, - const master::Flags& flags = master::Flags()); - - // Start and manage a new master using the specified authorizer. - Try<process::PID<master::Master> > start( - Authorizer* authorizer, - const master::Flags& flags = master::Flags()); - - // Start and manage a new master using the specified flags. - // An allocator process or authorizer may be specified in which - // case it will outlive the launched master. If either allocator - // process or authorizer is not specified then the default - // allocator or authorizer will be used. - Try<process::PID<master::Master> > start( - const Option<master::allocator::AllocatorProcess*>& allocatorProcess, - const Option<Authorizer*>& authorizer, - const master::Flags& flags = master::Flags()); + const master::Flags& flags = master::Flags(), + const Option<master::allocator::AllocatorProcess*>& allocator = None(), + const Option<Authorizer*>& authorizer = None()); // Stops and cleans up a master at the specified PID. Try<Nothing> stop(const process::PID<master::Master>& pid); @@ -137,30 +116,24 @@ public: // Encapsulates a single master's dependencies. struct Master { - Master() - : master(NULL), - allocator(NULL), - allocatorProcess(NULL), - log(NULL), - storage(NULL), - state(NULL), - registrar(NULL), - repairer(NULL), - contender(NULL), - detector(NULL), - authorizer(None()) {} + Master() : master(NULL) {} + + process::Owned<master::allocator::AllocatorProcess> allocatorProcess; + process::Owned<master::allocator::Allocator> allocator; + + process::Owned<log::Log> log; + process::Owned<state::Storage> storage; + process::Owned<state::protobuf::State> state; + process::Owned<master::Registrar> registrar; + + process::Owned<master::Repairer> repairer; + + process::Owned<MasterContender> contender; + process::Owned<MasterDetector> detector; + + process::Owned<Authorizer> authorizer; master::Master* master; - master::allocator::Allocator* allocator; - master::allocator::AllocatorProcess* allocatorProcess; - log::Log* log; - state::Storage* storage; - state::protobuf::State* state; - master::Registrar* registrar; - master::Repairer* repairer; - MasterContender* contender; - MasterDetector* detector; - Option<Authorizer*> authorizer; }; std::map<process::PID<master::Master>, Master> masters; @@ -176,29 +149,12 @@ public: // Stop and clean up all slaves. void shutdown(); - // Start and manage a new slave with a process isolator using the - // specified flags. - Try<process::PID<slave::Slave> > start( - const slave::Flags& flags = slave::Flags()); - - // Start and manage a new slave injecting the specified isolator. - // The isolator is expected to outlive the launched slave (i.e., - // until it is stopped via Slaves::stop). + // Start a new slave with the provided flags and injections. Try<process::PID<slave::Slave> > start( - slave::Containerizer* containerizer, - const slave::Flags& flags = slave::Flags()); - - // Start and manage a new slave injecting the specified Master - // Detector. The detector is expected to outlive the launched - // slave (i.e., until it is stopped via Slaves::stop). - Try<process::PID<slave::Slave> > start( - const Option<MasterDetector*>& detector, - const slave::Flags& flags = slave::Flags()); - - Try<process::PID<slave::Slave> > start( - slave::Containerizer* containerizer, - const Option<MasterDetector*>& detector, - const slave::Flags& flags = slave::Flags()); + const slave::Flags& flags = slave::Flags(), + const Option<slave::Containerizer*>& containerizer = None(), + const Option<MasterDetector*>& detector = None(), + const Option<slave::GarbageCollector*>& gc = None()); // Stops and cleans up a slave at the specified PID. If 'shutdown' // is true than the slave is sent a shutdown message instead of @@ -221,19 +177,15 @@ public: Slave() : containerizer(NULL), createdContainerizer(false), - slave(NULL), - detector(NULL) {} + slave(NULL) {} - // Register the slave's containerizer here. slave::Containerizer* containerizer; - // Record if we created the containerizer so we know to delete it when - // stopping the slave. - bool createdContainerizer; - slave::Slave* slave; - process::Owned<MasterDetector> detector; + bool createdContainerizer; // Whether we own the containerizer. - // Set to the slave::flags used for the slave. + process::Owned<slave::GarbageCollector> gc; + process::Owned<MasterDetector> detector; slave::Flags flags; + slave::Slave* slave; }; std::map<process::PID<slave::Slave>, Slave> slaves; @@ -284,32 +236,9 @@ inline void Cluster::Masters::shutdown() inline Try<process::PID<master::Master> > Cluster::Masters::start( - const master::Flags& flags) -{ - return start(None(), None(), flags); -} - - -inline Try<process::PID<master::Master> > Cluster::Masters::start( - master::allocator::AllocatorProcess* allocator, - const master::Flags& flags) -{ - return start(allocator, None(), flags); -} - - -inline Try<process::PID<master::Master> > Cluster::Masters::start( - Authorizer* authorizer, - const master::Flags& flags) -{ - return start(None(), authorizer, flags); -} - - -inline Try<process::PID<master::Master> > Cluster::Masters::start( + const master::Flags& flags, const Option<master::allocator::AllocatorProcess*>& allocatorProcess, - const Option<Authorizer*>& authorizer, - const master::Flags& flags) + const Option<Authorizer*>& authorizer) { // Disallow multiple masters when not using ZooKeeper. if (!masters.empty() && url.isNone()) { @@ -319,14 +248,13 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( Master master; if (allocatorProcess.isNone()) { - master.allocatorProcess = - new master::allocator::HierarchicalDRFAllocatorProcess(); - master.allocator = - new master::allocator::Allocator(master.allocatorProcess); + master.allocatorProcess.reset( + new master::allocator::HierarchicalDRFAllocatorProcess()); + master.allocator.reset( + new master::allocator::Allocator(master.allocatorProcess.get())); } else { - master.allocatorProcess = NULL; - master.allocator = - new master::allocator::Allocator(allocatorProcess.get()); + master.allocator.reset( + new master::allocator::Allocator(allocatorProcess.get())); } if (flags.registry == "in_memory") { @@ -335,7 +263,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( "Cannot use '--registry_strict' when using in-memory storage based" " registry"); } - master.storage = new state::InMemoryStorage(); + master.storage.reset(new state::InMemoryStorage()); } else if (flags.registry == "replicated_log") { if (flags.work_dir.isNone()) { return Error( @@ -349,40 +277,40 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( "Need to specify --quorum for replicated log based registry" " when using ZooKeeper"); } - master.log = new log::Log( + master.log.reset(new log::Log( flags.quorum.get(), path::join(flags.work_dir.get(), "replicated_log"), url.get().servers, flags.zk_session_timeout, path::join(url.get().path, "log_replicas"), url.get().authentication, - flags.log_auto_initialize); + flags.log_auto_initialize)); } else { - master.log = new log::Log( + master.log.reset(new log::Log( 1, path::join(flags.work_dir.get(), "replicated_log"), std::set<process::UPID>(), - flags.log_auto_initialize); + flags.log_auto_initialize)); } - master.storage = new state::LogStorage(master.log); + master.storage.reset(new state::LogStorage(master.log.get())); } else { return Error("'" + flags.registry + "' is not a supported option for" " registry persistence"); } - CHECK_NOTNULL(master.storage); + CHECK_NOTNULL(master.storage.get()); - master.state = new state::protobuf::State(master.storage); - master.registrar = new master::Registrar(flags, master.state); - master.repairer = new master::Repairer(); + master.state.reset(new state::protobuf::State(master.storage.get())); + master.registrar.reset(new master::Registrar(flags, master.state.get())); + master.repairer.reset(new master::Repairer()); if (url.isSome()) { - master.contender = new ZooKeeperMasterContender(url.get()); - master.detector = new ZooKeeperMasterDetector(url.get()); + master.contender.reset(new ZooKeeperMasterContender(url.get())); + master.detector.reset(new ZooKeeperMasterDetector(url.get())); } else { - master.contender = new StandaloneMasterContender(); - master.detector = new StandaloneMasterDetector(); + master.contender.reset(new StandaloneMasterContender()); + master.detector.reset(new StandaloneMasterDetector()); } if (authorizer.isSome()) { @@ -395,23 +323,25 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( authorizer_.error() + " (see --acls flag)"); } process::Owned<Authorizer> authorizer__ = authorizer_.get(); - master.authorizer = authorizer__.release(); + master.authorizer = authorizer__; } master.master = new master::Master( - master.allocator, - master.registrar, - master.repairer, + master.allocator.get(), + master.registrar.get(), + master.repairer.get(), &cluster->files, - master.contender, - master.detector, - authorizer.isSome() ? authorizer : master.authorizer, + master.contender.get(), + master.detector.get(), + authorizer.isSome() ? authorizer : master.authorizer.get(), flags); if (url.isNone()) { // This means we are using the StandaloneMasterDetector. - CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(master.detector)) - ->appoint(master.master->info()); + StandaloneMasterDetector* detector_ = CHECK_NOTNULL( + dynamic_cast<StandaloneMasterDetector*>(master.detector.get())); + + detector_->appoint(master.master->info()); } process::Future<Nothing> _recover = @@ -461,22 +391,6 @@ inline Try<Nothing> Cluster::Masters::stop( process::wait(master.master); delete master.master; - delete master.allocator; // Terminates and waits for allocator process. - delete master.allocatorProcess; // May be NULL. - - delete master.repairer; - delete master.registrar; - delete master.state; - delete master.storage; - delete master.log; - - delete master.contender; - delete master.detector; - - if (master.authorizer.isSome()) { - delete master.authorizer.get(); - } - masters.erase(pid); return Nothing(); @@ -537,103 +451,44 @@ inline void Cluster::Slaves::shutdown() inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( - const slave::Flags& flags) -{ - // TODO(benh): Create a work directory if using the default. - - Slave slave; - - slave.flags = flags; - - // Create a new containerizer for this slave. - Try<slave::Containerizer*> containerizer = - slave::Containerizer::create(flags, true); - CHECK_SOME(containerizer); - - slave.containerizer = containerizer.get(); - slave.createdContainerizer = true; - - // Get a detector for the master(s). - slave.detector = masters->detector(); - - slave.slave = new slave::Slave( - flags, slave.detector.get(), slave.containerizer, &cluster->files); - process::PID<slave::Slave> pid = process::spawn(slave.slave); - - slaves[pid] = slave; - - return pid; -} - - -inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( - slave::Containerizer* containerizer, - const slave::Flags& flags) -{ - return start(containerizer, None(), flags); -} - - -inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( + const slave::Flags& flags, + const Option<slave::Containerizer*>& containerizer, const Option<MasterDetector*>& detector, - const slave::Flags& flags) + const Option<slave::GarbageCollector*>& gc) { // TODO(benh): Create a work directory if using the default. Slave slave; - slave.flags = flags; - - // Create a new containerizer for this slave. - Try<slave::Containerizer*> containerizer = - slave::Containerizer::create(flags, true); - CHECK_SOME(containerizer); + if (containerizer.isSome()) { + slave.containerizer = containerizer.get(); + } else { + Try<slave::Containerizer*> containerizer = + slave::Containerizer::create(flags, true); + CHECK_SOME(containerizer); - slave.containerizer = containerizer.get(); - slave.createdContainerizer = true; + slave.containerizer = containerizer.get(); + slave.createdContainerizer = true; + } // Get a detector for the master(s) if one wasn't provided. if (detector.isNone()) { slave.detector = masters->detector(); } - slave.slave = new slave::Slave( - flags, - detector.get(slave.detector.get()), - slave.containerizer, - &cluster->files); - - process::PID<slave::Slave> pid = process::spawn(slave.slave); - - slaves[pid] = slave; - - return pid; -} - - -inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( - slave::Containerizer* containerizer, - const Option<MasterDetector*>& detector, - const slave::Flags& flags) -{ - // TODO(benh): Create a work directory if using the default. - - Slave slave; - - slave.containerizer = containerizer; + // Create a garbage collector if one wasn't provided. + if (gc.isNone()) { + slave.gc.reset(new slave::GarbageCollector()); + } slave.flags = flags; - // Get a detector for the master(s) if one wasn't provided. - if (detector.isNone()) { - slave.detector = masters->detector(); - } - slave.slave = new slave::Slave( flags, detector.get(slave.detector.get()), - containerizer, - &cluster->files); + slave.containerizer, + &cluster->files, + gc.get(slave.gc.get())); process::PID<slave::Slave> pid = process::spawn(slave.slave); http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index fd49fb6..6689420 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -65,6 +65,7 @@ using process::Future; using process::Message; using process::Owned; using process::PID; +using process::Promise; using process::UPID; using process::http::OK; using process::http::Response; http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index b9aa7bf..0804482 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -249,16 +249,16 @@ TEST_F(MasterAuthorizationTest, KillTask) tasks.push_back(task); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.launchTasks(offers.get()[0].id(), tasks); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); Future<TaskStatus> status; EXPECT_CALL(sched, statusUpdate(&driver, _)) @@ -323,16 +323,16 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved) tasks.push_back(task); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.launchTasks(offers.get()[0].id(), tasks); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); Future<Nothing> slaveLost; EXPECT_CALL(sched, slaveLost(&driver, _)) @@ -407,16 +407,16 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected) tasks.push_back(task); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.launchTasks(offers.get()[0].id(), tasks); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); EXPECT_CALL(sched, slaveLost(&driver, _)) .Times(AtMost(1)); @@ -489,16 +489,16 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved) tasks.push_back(task); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.launchTasks(offers.get()[0].id(), tasks); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); Future<Nothing> frameworkRemoved = FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved); @@ -569,16 +569,16 @@ TEST_F(MasterAuthorizationTest, PendingExecutorInfoDiffersOnDifferentSlaves) tasks1.push_back(task1); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.launchTasks(offers1.get()[0].id(), tasks1); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); Future<vector<Offer> > offers2; EXPECT_CALL(sched, resourceOffers(&driver, _)) @@ -758,26 +758,26 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration) .WillOnce(FutureSatisfy(®istered)); // Return pending futures from authorizer. - Future<Nothing> future1; + Future<Nothing> authorize1; Promise<bool> promise1; - Future<Nothing> future2; + Future<Nothing> authorize2; Promise<bool> promise2; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RegisterFramework&>())) - .WillOnce(DoAll(FutureSatisfy(&future1), + .WillOnce(DoAll(FutureSatisfy(&authorize1), Return(promise1.future()))) - .WillOnce(DoAll(FutureSatisfy(&future2), + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))); driver.start(); // Wait until first authorization attempt is in progress. - AWAIT_READY(future1); + AWAIT_READY(authorize1); // Simulate a spurious leading master change at the scheduler. detector.appoint(master.get()); // Wait until second authorization attempt is in progress. - AWAIT_READY(future2); + AWAIT_READY(authorize2); // Now complete the first authorization attempt. promise1.set(true); @@ -824,15 +824,15 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) .WillOnce(FutureSatisfy(®istered)); // Return pending futures from authorizer after the first attempt. - Future<Nothing> future2; + Future<Nothing> authorize2; Promise<bool> promise2; - Future<Nothing> future3; + Future<Nothing> authorize3; Promise<bool> promise3; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RegisterFramework&>())) .WillOnce(Return(true)) - .WillOnce(DoAll(FutureSatisfy(&future2), + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))) - .WillOnce(DoAll(FutureSatisfy(&future3), + .WillOnce(DoAll(FutureSatisfy(&authorize3), Return(promise3.future()))); driver.start(); @@ -846,13 +846,13 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) detector.appoint(master.get()); // Wait until the second authorization attempt is in progress. - AWAIT_READY(future2); + AWAIT_READY(authorize2); // Simulate another spurious leading master change at the scheduler. detector.appoint(master.get()); // Wait until the third authorization attempt is in progress. - AWAIT_READY(future3); + AWAIT_READY(authorize3); Future<Nothing> reregistered; EXPECT_CALL(sched, reregistered(&driver, _)) @@ -894,16 +894,16 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration) &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RegisterFramework&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); driver.start(); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); // Stop the framework. // At this point the framework is disconnected but the master does @@ -952,11 +952,11 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration) .WillOnce(FutureSatisfy(®istered)); // Return a pending future from authorizer after first attempt. - Future<Nothing> future2; + Future<Nothing> authorize2; Promise<bool> promise2; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RegisterFramework&>())) .WillOnce(Return(true)) - .WillOnce(DoAll(FutureSatisfy(&future2), + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))); driver.start(); @@ -974,7 +974,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration) detector.appoint(master.get()); // Wait until the second authorization attempt is in progress. - AWAIT_READY(future2); + AWAIT_READY(authorize2); Future<Nothing> frameworkRemoved = FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved); http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 10a45e3..3dcb2ac 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -185,7 +185,8 @@ Try<process::PID<master::Master> > MesosTest::StartMaster( const Option<master::Flags>& flags) { return cluster.masters.start( - allocator, flags.isNone() ? CreateMasterFlags() : flags.get()); + flags.isNone() ? CreateMasterFlags() : flags.get(), + allocator); } @@ -194,7 +195,9 @@ Try<process::PID<master::Master> > MesosTest::StartMaster( const Option<master::Flags>& flags) { return cluster.masters.start( - authorizer, flags.isNone() ? CreateMasterFlags() : flags.get()); + flags.isNone() ? CreateMasterFlags() : flags.get(), + None(), + authorizer); } @@ -230,7 +233,8 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave( const Option<slave::Flags>& flags) { return cluster.slaves.start( - containerizer, flags.isNone() ? CreateSlaveFlags() : flags.get()); + flags.isNone() ? CreateSlaveFlags() : flags.get(), + containerizer); } @@ -240,18 +244,33 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave( const Option<slave::Flags>& flags) { return cluster.slaves.start( + flags.isNone() ? CreateSlaveFlags() : flags.get(), containerizer, - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get()); + detector); +} + + +Try<PID<slave::Slave> > MesosTest::StartSlave( + MasterDetector* detector, + const Option<slave::Flags>& flags) +{ + return cluster.slaves.start( + flags.isNone() ? CreateSlaveFlags() : flags.get(), + None(), + detector); } Try<PID<slave::Slave> > MesosTest::StartSlave( MasterDetector* detector, + slave::GarbageCollector* gc, const Option<slave::Flags>& flags) { return cluster.slaves.start( - detector, flags.isNone() ? CreateSlaveFlags() : flags.get()); + flags.isNone() ? CreateSlaveFlags() : flags.get(), + None(), + detector, + gc); } @@ -263,9 +282,9 @@ Try<PID<slave::Slave> > MesosTest::StartSlave( slave::Containerizer* containerizer = new TestContainerizer(executor); Try<process::PID<slave::Slave> > pid = cluster.slaves.start( - containerizer, - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get()); + flags.isNone() ? CreateSlaveFlags() : flags.get(), + containerizer, + detector); if (pid.isError()) { delete containerizer; http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index b31c347..957e223 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -100,6 +100,25 @@ protected: Authorizer* authorizer, const Option<master::Flags>& flags = None()); + // TODO(bmahler): Consider adding a builder style interface, e.g. + // + // Try<PID<Slave> > slave = + // Slave().With(flags) + // .With(executor) + // .With(containerizer) + // .With(detector) + // .With(gc) + // .Start(); + // + // Or options: + // + // Injections injections; + // injections.executor = executor; + // injections.containerizer = containerizer; + // injections.detector = detector; + // injections.gc = gc; + // Try<PID<Slave> > slave = StartSlave(injections); + // Starts a slave with the specified flags. virtual Try<process::PID<slave::Slave> > StartSlave( const Option<slave::Flags>& flags = None()); @@ -125,6 +144,12 @@ protected: MasterDetector* detector, const Option<slave::Flags>& flags = None()); + // Starts a slave with the specified MasterDetector, GC, and flags. + virtual Try<process::PID<slave::Slave> > StartSlave( + MasterDetector* detector, + slave::GarbageCollector* gc, + const Option<slave::Flags>& flags = None()); + // Starts a slave with the specified mock executor, MasterDetector // and flags. virtual Try<process::PID<slave::Slave> > StartSlave( @@ -480,6 +505,39 @@ public: }; +class MockGarbageCollector : public slave::GarbageCollector +{ +public: + MockGarbageCollector() + { + using ::testing::_; + using ::testing::Return; + + // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of + // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()' + // for more details. + EXPECT_CALL(*this, schedule(_, _)) + .WillRepeatedly(Return(Nothing())); + + EXPECT_CALL(*this, unschedule(_)) + .WillRepeatedly(Return(true)); + + EXPECT_CALL(*this, prune(_)) + .WillRepeatedly(Return()); + } + + MOCK_METHOD2( + schedule, + process::Future<Nothing>(const Duration& d, const std::string& path)); + MOCK_METHOD1( + unschedule, + process::Future<bool>(const std::string& path)); + MOCK_METHOD1( + prune, + void(const Duration& d)); +}; + + // Definition of a MockAuthozier that can be used in tests with gmock. class MockAuthorizer : public Authorizer { http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 8c66659..1c9e73b 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -615,10 +615,10 @@ TEST_F(ReconciliationTest, PendingTask) EXPECT_NE(0u, offers.get().size()); // Return a pending future from authorizer. - Future<Nothing> future; + Future<Nothing> authorize; Promise<bool> promise; EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), + .WillOnce(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); @@ -628,7 +628,7 @@ TEST_F(ReconciliationTest, PendingTask) driver.launchTasks(offers.get()[0].id(), tasks); // Wait until authorization is in progress. - AWAIT_READY(future); + AWAIT_READY(authorize); // First send an implicit reconciliation request for this task. Future<TaskStatus> update;
