Made the GarbageCollector injectable into the Slave.

Review: https://reviews.apache.org/r/25372


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1071d3e3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1071d3e3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1071d3e3

Branch: refs/heads/master
Commit: 1071d3e39ce558aaf541475f8786e4a7003752e0
Parents: 00024c3
Author: Benjamin Mahler <[email protected]>
Authored: Thu Sep 4 10:23:51 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Sep 10 11:05:35 2014 -0700

----------------------------------------------------------------------
 src/local/local.cpp                      |  21 +-
 src/slave/gc.hpp                         |  10 +-
 src/slave/main.cpp                       |   6 +-
 src/slave/slave.cpp                      |  10 +-
 src/slave/slave.hpp                      |   5 +-
 src/tests/cluster.hpp                    | 311 +++++++-------------------
 src/tests/fault_tolerance_tests.cpp      |   1 +
 src/tests/master_authorization_tests.cpp |  66 +++---
 src/tests/mesos.cpp                      |  37 ++-
 src/tests/mesos.hpp                      |  58 +++++
 src/tests/reconciliation_tests.cpp       |   6 +-
 11 files changed, 246 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 9db4dd1..b8a481d 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -50,6 +50,7 @@
 #include "master/repairer.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
+#include "slave/gc.hpp"
 #include "slave/slave.hpp"
 
 #include "state/in_memory.hpp"
@@ -70,6 +71,7 @@ using mesos::internal::master::Registrar;
 using mesos::internal::master::Repairer;
 
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::GarbageCollector;
 using mesos::internal::slave::Slave;
 
 using process::Owned;
@@ -100,6 +102,7 @@ static StandaloneMasterDetector* detector = NULL;
 static MasterContender* contender = NULL;
 static Option<Authorizer*> authorizer = None();
 static Files* files = NULL;
+static vector<GarbageCollector*>* garbageCollectors = NULL;
 
 
 PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -193,6 +196,8 @@ PID<Master> launch(const Flags& flags, Allocator* 
_allocator)
 
   PID<Master> pid = process::spawn(master);
 
+  garbageCollectors = new vector<GarbageCollector*>();
+
   vector<UPID> pids;
 
   for (int i = 0; i < flags.num_slaves; i++) {
@@ -204,6 +209,8 @@ PID<Master> launch(const Flags& flags, Allocator* 
_allocator)
               << "slave flags from the environment: " << load.error();
     }
 
+    garbageCollectors->push_back(new GarbageCollector());
+
     Try<Containerizer*> containerizer = Containerizer::create(flags, true);
     if (containerizer.isError()) {
       EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
@@ -214,7 +221,12 @@ PID<Master> launch(const Flags& flags, Allocator* 
_allocator)
 
     // NOTE: At this point detector is already initialized by the
     // Master.
-    Slave* slave = new Slave(flags, detector, containerizer.get(), files);
+    Slave* slave = new Slave(
+        flags,
+        detector,
+        containerizer.get(),
+        files,
+        garbageCollectors->back());
     slaves[containerizer.get()] = slave;
     pids.push_back(process::spawn(slave));
   }
@@ -262,6 +274,13 @@ void shutdown()
     delete files;
     files = NULL;
 
+    foreach (GarbageCollector* gc, *garbageCollectors) {
+      delete gc;
+    }
+
+    delete garbageCollectors;
+    garbageCollectors = NULL;
+
     delete registrar;
     registrar = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/gc.hpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.hpp b/src/slave/gc.hpp
index 8f9398b..780f9c9 100644
--- a/src/slave/gc.hpp
+++ b/src/slave/gc.hpp
@@ -52,7 +52,7 @@ class GarbageCollector
 {
 public:
   GarbageCollector();
-  ~GarbageCollector();
+  virtual ~GarbageCollector();
 
   // Schedules the specified path for removal after the specified
   // duration of time has elapsed. If the path is already scheduled,
@@ -64,18 +64,20 @@ public:
   // was rescheduled.
   // Note that you currently cannot discard a returned future, instead
   // you must call unschedule.
-  process::Future<Nothing> schedule(const Duration& d, const std::string& 
path);
+  virtual process::Future<Nothing> schedule(
+      const Duration& d,
+      const std::string& path);
 
   // Unschedules the specified path for removal.
   // The future will be true if the path has been unscheduled.
   // The future will be false if the path is not scheduled for
   // removal, or the path has already being removed.
   // Note that you currently cannot discard a returned future.
-  process::Future<bool> unschedule(const std::string& path);
+  virtual process::Future<bool> unschedule(const std::string& path);
 
   // Deletes all the directories, whose scheduled garbage collection time
   // is within the next 'd' duration of time.
-  void prune(const Duration& d);
+  virtual void prune(const Duration& d);
 
 private:
   GarbageCollectorProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 319316f..2c4d365 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -33,6 +33,7 @@
 
 #include "logging/logging.hpp"
 
+#include "slave/gc.hpp"
 #include "slave/slave.hpp"
 
 using namespace mesos::internal;
@@ -149,7 +150,10 @@ int main(int argc, char** argv)
   LOG(INFO) << "Starting Mesos slave";
 
   Files files;
-  Slave* slave = new Slave(flags, detector.get(), containerizer.get(), &files);
+  GarbageCollector gc;
+
+  Slave* slave =
+    new Slave(flags, detector.get(), containerizer.get(), &files, &gc);
   process::spawn(slave);
 
   process::wait(slave->self());

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 9536a3b..f1df9d1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -103,7 +103,8 @@ using namespace state;
 Slave::Slave(const slave::Flags& _flags,
              MasterDetector* _detector,
              Containerizer* _containerizer,
-             Files* _files)
+             Files* _files,
+             GarbageCollector* _gc)
   : ProcessBase(process::ID::generate("slave")),
     state(RECOVERING),
     http(this),
@@ -113,6 +114,7 @@ Slave::Slave(const slave::Flags& _flags,
     containerizer(_containerizer),
     files(_files),
     metrics(*this),
+    gc(_gc),
     monitor(containerizer),
     statusUpdateManager(new StatusUpdateManager()),
     metaDir(paths::getMetaRootDir(flags.work_dir)),
@@ -1000,7 +1002,7 @@ void Slave::doReliableRegistration(const Duration& 
duration)
 // TODO(vinod): Can we avoid this helper?
 Future<bool> Slave::unschedule(const string& path)
 {
-  return gc.unschedule(path);
+  return gc->unschedule(path);
 }
 
 
@@ -3077,7 +3079,7 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
     // the next 'gc_delay - age'. Since a directory is always
     // scheduled for deletion 'gc_delay' into the future, only directories
     // that are at least 'age' old are deleted.
-    gc.prune(flags.gc_delay - age(usage.get()));
+    gc->prune(flags.gc_delay - age(usage.get()));
   }
   delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
 }
@@ -3335,7 +3337,7 @@ Future<Nothing> Slave::garbageCollect(const string& path)
   // GC based on the modification time.
   Duration delay = flags.gc_delay - (Clock::now() - time.get());
 
-  return gc.schedule(delay, path);
+  return gc->schedule(delay, path);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d8c7ee4..a418536 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -89,7 +89,8 @@ public:
   Slave(const Flags& flags,
         MasterDetector* detector,
         Containerizer* containerizer,
-        Files* files);
+        Files* files,
+        GarbageCollector* gc);
 
   virtual ~Slave();
 
@@ -438,7 +439,7 @@ private:
 
   process::Time startTime;
 
-  GarbageCollector gc;
+  GarbageCollector* gc;
   ResourceMonitor monitor;
 
   StatusUpdateManager* statusUpdateManager;

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index d857fc6..aad4275 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -62,6 +62,7 @@
 #include "master/repairer.hpp"
 
 #include "slave/flags.hpp"
+#include "slave/gc.hpp"
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/slave.hpp"
 
@@ -92,33 +93,11 @@ public:
 
     void shutdown();
 
-    // Start and manage a new master using the specified flags.
-    // This overload is shorthand to specify that you want default
-    // master objects and is equivalent to passing None to all of
-    // the required arguments of the other overload.
+    // Start a new master with the provided flags and injections.
     Try<process::PID<master::Master> > start(
-        const master::Flags& flags = master::Flags());
-
-    // Start and manage a new master using the specified allocator
-    // process.
-    Try<process::PID<master::Master> > start(
-        master::allocator::AllocatorProcess* allocatorProcess,
-        const master::Flags& flags = master::Flags());
-
-    // Start and manage a new master using the specified authorizer.
-    Try<process::PID<master::Master> > start(
-        Authorizer* authorizer,
-        const master::Flags& flags = master::Flags());
-
-    // Start and manage a new master using the specified flags.
-    // An allocator process or authorizer may be specified in which
-    // case it will outlive the launched master. If either allocator
-    // process or authorizer is not specified then the default
-    // allocator or authorizer will be used.
-    Try<process::PID<master::Master> > start(
-        const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
-        const Option<Authorizer*>& authorizer,
-        const master::Flags& flags = master::Flags());
+        const master::Flags& flags = master::Flags(),
+        const Option<master::allocator::AllocatorProcess*>& allocator = None(),
+        const Option<Authorizer*>& authorizer = None());
 
     // Stops and cleans up a master at the specified PID.
     Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -137,30 +116,24 @@ public:
     // Encapsulates a single master's dependencies.
     struct Master
     {
-      Master()
-        : master(NULL),
-          allocator(NULL),
-          allocatorProcess(NULL),
-          log(NULL),
-          storage(NULL),
-          state(NULL),
-          registrar(NULL),
-          repairer(NULL),
-          contender(NULL),
-          detector(NULL),
-          authorizer(None()) {}
+      Master() : master(NULL) {}
+
+      process::Owned<master::allocator::AllocatorProcess> allocatorProcess;
+      process::Owned<master::allocator::Allocator> allocator;
+
+      process::Owned<log::Log> log;
+      process::Owned<state::Storage> storage;
+      process::Owned<state::protobuf::State> state;
+      process::Owned<master::Registrar> registrar;
+
+      process::Owned<master::Repairer> repairer;
+
+      process::Owned<MasterContender> contender;
+      process::Owned<MasterDetector> detector;
+
+      process::Owned<Authorizer> authorizer;
 
       master::Master* master;
-      master::allocator::Allocator* allocator;
-      master::allocator::AllocatorProcess* allocatorProcess;
-      log::Log* log;
-      state::Storage* storage;
-      state::protobuf::State* state;
-      master::Registrar* registrar;
-      master::Repairer* repairer;
-      MasterContender* contender;
-      MasterDetector* detector;
-      Option<Authorizer*> authorizer;
     };
 
     std::map<process::PID<master::Master>, Master> masters;
@@ -176,29 +149,12 @@ public:
     // Stop and clean up all slaves.
     void shutdown();
 
-    // Start and manage a new slave with a process isolator using the
-    // specified flags.
-    Try<process::PID<slave::Slave> > start(
-        const slave::Flags& flags = slave::Flags());
-
-    // Start and manage a new slave injecting the specified isolator.
-    // The isolator is expected to outlive the launched slave (i.e.,
-    // until it is stopped via Slaves::stop).
+    // Start a new slave with the provided flags and injections.
     Try<process::PID<slave::Slave> > start(
-        slave::Containerizer* containerizer,
-        const slave::Flags& flags = slave::Flags());
-
-    // Start and manage a new slave injecting the specified Master
-    // Detector. The detector is expected to outlive the launched
-    // slave (i.e., until it is stopped via Slaves::stop).
-    Try<process::PID<slave::Slave> > start(
-        const Option<MasterDetector*>& detector,
-        const slave::Flags& flags = slave::Flags());
-
-    Try<process::PID<slave::Slave> > start(
-        slave::Containerizer* containerizer,
-        const Option<MasterDetector*>& detector,
-        const slave::Flags& flags = slave::Flags());
+        const slave::Flags& flags = slave::Flags(),
+        const Option<slave::Containerizer*>& containerizer = None(),
+        const Option<MasterDetector*>& detector = None(),
+        const Option<slave::GarbageCollector*>& gc = None());
 
     // Stops and cleans up a slave at the specified PID. If 'shutdown'
     // is true than the slave is sent a shutdown message instead of
@@ -221,19 +177,15 @@ public:
       Slave()
         : containerizer(NULL),
           createdContainerizer(false),
-          slave(NULL),
-          detector(NULL) {}
+          slave(NULL) {}
 
-      // Register the slave's containerizer here.
       slave::Containerizer* containerizer;
-      // Record if we created the containerizer so we know to delete it when
-      // stopping the slave.
-      bool createdContainerizer;
-      slave::Slave* slave;
-      process::Owned<MasterDetector> detector;
+      bool createdContainerizer; // Whether we own the containerizer.
 
-      // Set to the slave::flags used for the slave.
+      process::Owned<slave::GarbageCollector> gc;
+      process::Owned<MasterDetector> detector;
       slave::Flags flags;
+      slave::Slave* slave;
     };
 
     std::map<process::PID<slave::Slave>, Slave> slaves;
@@ -284,32 +236,9 @@ inline void Cluster::Masters::shutdown()
 
 
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
-    const master::Flags& flags)
-{
-  return start(None(), None(), flags);
-}
-
-
-inline Try<process::PID<master::Master> > Cluster::Masters::start(
-    master::allocator::AllocatorProcess* allocator,
-    const master::Flags& flags)
-{
-  return start(allocator, None(), flags);
-}
-
-
-inline Try<process::PID<master::Master> > Cluster::Masters::start(
-    Authorizer* authorizer,
-    const master::Flags& flags)
-{
-  return start(None(), authorizer, flags);
-}
-
-
-inline Try<process::PID<master::Master> > Cluster::Masters::start(
+    const master::Flags& flags,
     const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
-    const Option<Authorizer*>& authorizer,
-    const master::Flags& flags)
+    const Option<Authorizer*>& authorizer)
 {
   // Disallow multiple masters when not using ZooKeeper.
   if (!masters.empty() && url.isNone()) {
@@ -319,14 +248,13 @@ inline Try<process::PID<master::Master> > 
Cluster::Masters::start(
   Master master;
 
   if (allocatorProcess.isNone()) {
-    master.allocatorProcess =
-        new master::allocator::HierarchicalDRFAllocatorProcess();
-    master.allocator =
-        new master::allocator::Allocator(master.allocatorProcess);
+    master.allocatorProcess.reset(
+        new master::allocator::HierarchicalDRFAllocatorProcess());
+    master.allocator.reset(
+        new master::allocator::Allocator(master.allocatorProcess.get()));
   } else {
-    master.allocatorProcess = NULL;
-    master.allocator =
-        new master::allocator::Allocator(allocatorProcess.get());
+    master.allocator.reset(
+        new master::allocator::Allocator(allocatorProcess.get()));
   }
 
   if (flags.registry == "in_memory") {
@@ -335,7 +263,7 @@ inline Try<process::PID<master::Master> > 
Cluster::Masters::start(
           "Cannot use '--registry_strict' when using in-memory storage based"
           " registry");
     }
-    master.storage = new state::InMemoryStorage();
+    master.storage.reset(new state::InMemoryStorage());
   } else if (flags.registry == "replicated_log") {
     if (flags.work_dir.isNone()) {
       return Error(
@@ -349,40 +277,40 @@ inline Try<process::PID<master::Master> > 
Cluster::Masters::start(
             "Need to specify --quorum for replicated log based registry"
             " when using ZooKeeper");
       }
-      master.log = new log::Log(
+      master.log.reset(new log::Log(
           flags.quorum.get(),
           path::join(flags.work_dir.get(), "replicated_log"),
           url.get().servers,
           flags.zk_session_timeout,
           path::join(url.get().path, "log_replicas"),
           url.get().authentication,
-          flags.log_auto_initialize);
+          flags.log_auto_initialize));
     } else {
-      master.log = new log::Log(
+      master.log.reset(new log::Log(
           1,
           path::join(flags.work_dir.get(), "replicated_log"),
           std::set<process::UPID>(),
-          flags.log_auto_initialize);
+          flags.log_auto_initialize));
     }
 
-    master.storage = new state::LogStorage(master.log);
+    master.storage.reset(new state::LogStorage(master.log.get()));
   } else {
     return Error("'" + flags.registry + "' is not a supported option for"
                  " registry persistence");
   }
 
-  CHECK_NOTNULL(master.storage);
+  CHECK_NOTNULL(master.storage.get());
 
-  master.state = new state::protobuf::State(master.storage);
-  master.registrar = new master::Registrar(flags, master.state);
-  master.repairer = new master::Repairer();
+  master.state.reset(new state::protobuf::State(master.storage.get()));
+  master.registrar.reset(new master::Registrar(flags, master.state.get()));
+  master.repairer.reset(new master::Repairer());
 
   if (url.isSome()) {
-    master.contender = new ZooKeeperMasterContender(url.get());
-    master.detector = new ZooKeeperMasterDetector(url.get());
+    master.contender.reset(new ZooKeeperMasterContender(url.get()));
+    master.detector.reset(new ZooKeeperMasterDetector(url.get()));
   } else {
-    master.contender = new StandaloneMasterContender();
-    master.detector = new StandaloneMasterDetector();
+    master.contender.reset(new StandaloneMasterContender());
+    master.detector.reset(new StandaloneMasterDetector());
   }
 
   if (authorizer.isSome()) {
@@ -395,23 +323,25 @@ inline Try<process::PID<master::Master> > 
Cluster::Masters::start(
                    authorizer_.error() + " (see --acls flag)");
     }
     process::Owned<Authorizer> authorizer__ = authorizer_.get();
-    master.authorizer = authorizer__.release();
+    master.authorizer = authorizer__;
   }
 
   master.master = new master::Master(
-      master.allocator,
-      master.registrar,
-      master.repairer,
+      master.allocator.get(),
+      master.registrar.get(),
+      master.repairer.get(),
       &cluster->files,
-      master.contender,
-      master.detector,
-      authorizer.isSome() ? authorizer : master.authorizer,
+      master.contender.get(),
+      master.detector.get(),
+      authorizer.isSome() ? authorizer : master.authorizer.get(),
       flags);
 
   if (url.isNone()) {
     // This means we are using the StandaloneMasterDetector.
-    CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(master.detector))
-        ->appoint(master.master->info());
+    StandaloneMasterDetector* detector_ = CHECK_NOTNULL(
+        dynamic_cast<StandaloneMasterDetector*>(master.detector.get()));
+
+    detector_->appoint(master.master->info());
   }
 
   process::Future<Nothing> _recover =
@@ -461,22 +391,6 @@ inline Try<Nothing> Cluster::Masters::stop(
   process::wait(master.master);
   delete master.master;
 
-  delete master.allocator; // Terminates and waits for allocator process.
-  delete master.allocatorProcess; // May be NULL.
-
-  delete master.repairer;
-  delete master.registrar;
-  delete master.state;
-  delete master.storage;
-  delete master.log;
-
-  delete master.contender;
-  delete master.detector;
-
-  if (master.authorizer.isSome()) {
-    delete master.authorizer.get();
-  }
-
   masters.erase(pid);
 
   return Nothing();
@@ -537,103 +451,44 @@ inline void Cluster::Slaves::shutdown()
 
 
 inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
-    const slave::Flags& flags)
-{
-  // TODO(benh): Create a work directory if using the default.
-
-  Slave slave;
-
-  slave.flags = flags;
-
-  // Create a new containerizer for this slave.
-  Try<slave::Containerizer*> containerizer =
-    slave::Containerizer::create(flags, true);
-  CHECK_SOME(containerizer);
-
-  slave.containerizer = containerizer.get();
-  slave.createdContainerizer = true;
-
-  // Get a detector for the master(s).
-  slave.detector = masters->detector();
-
-  slave.slave = new slave::Slave(
-      flags, slave.detector.get(), slave.containerizer, &cluster->files);
-  process::PID<slave::Slave> pid = process::spawn(slave.slave);
-
-  slaves[pid] = slave;
-
-  return pid;
-}
-
-
-inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
-    slave::Containerizer* containerizer,
-    const slave::Flags& flags)
-{
-  return start(containerizer, None(), flags);
-}
-
-
-inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
+    const slave::Flags& flags,
+    const Option<slave::Containerizer*>& containerizer,
     const Option<MasterDetector*>& detector,
-    const slave::Flags& flags)
+    const Option<slave::GarbageCollector*>& gc)
 {
   // TODO(benh): Create a work directory if using the default.
 
   Slave slave;
 
-  slave.flags = flags;
-
-  // Create a new containerizer for this slave.
-  Try<slave::Containerizer*> containerizer =
-    slave::Containerizer::create(flags, true);
-  CHECK_SOME(containerizer);
+  if (containerizer.isSome()) {
+    slave.containerizer = containerizer.get();
+  } else {
+    Try<slave::Containerizer*> containerizer =
+      slave::Containerizer::create(flags, true);
+    CHECK_SOME(containerizer);
 
-  slave.containerizer = containerizer.get();
-  slave.createdContainerizer = true;
+    slave.containerizer = containerizer.get();
+    slave.createdContainerizer = true;
+  }
 
   // Get a detector for the master(s) if one wasn't provided.
   if (detector.isNone()) {
     slave.detector = masters->detector();
   }
 
-  slave.slave = new slave::Slave(
-      flags,
-      detector.get(slave.detector.get()),
-      slave.containerizer,
-      &cluster->files);
-
-  process::PID<slave::Slave> pid = process::spawn(slave.slave);
-
-  slaves[pid] = slave;
-
-  return pid;
-}
-
-
-inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
-    slave::Containerizer* containerizer,
-    const Option<MasterDetector*>& detector,
-    const slave::Flags& flags)
-{
-  // TODO(benh): Create a work directory if using the default.
-
-  Slave slave;
-
-  slave.containerizer = containerizer;
+  // Create a garbage collector if one wasn't provided.
+  if (gc.isNone()) {
+    slave.gc.reset(new slave::GarbageCollector());
+  }
 
   slave.flags = flags;
 
-  // Get a detector for the master(s) if one wasn't provided.
-  if (detector.isNone()) {
-    slave.detector = masters->detector();
-  }
-
   slave.slave = new slave::Slave(
       flags,
       detector.get(slave.detector.get()),
-      containerizer,
-      &cluster->files);
+      slave.containerizer,
+      &cluster->files,
+      gc.get(slave.gc.get()));
 
   process::PID<slave::Slave> pid = process::spawn(slave.slave);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index fd49fb6..6689420 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -65,6 +65,7 @@ using process::Future;
 using process::Message;
 using process::Owned;
 using process::PID;
+using process::Promise;
 using process::UPID;
 using process::http::OK;
 using process::http::Response;

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index b9aa7bf..0804482 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -249,16 +249,16 @@ TEST_F(MasterAuthorizationTest, KillTask)
   tasks.push_back(task);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -323,16 +323,16 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
   tasks.push_back(task);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   Future<Nothing> slaveLost;
   EXPECT_CALL(sched, slaveLost(&driver, _))
@@ -407,16 +407,16 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected)
   tasks.push_back(task);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .Times(AtMost(1));
@@ -489,16 +489,16 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
   tasks.push_back(task);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   Future<Nothing> frameworkRemoved =
     FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
@@ -569,16 +569,16 @@ TEST_F(MasterAuthorizationTest, 
PendingExecutorInfoDiffersOnDifferentSlaves)
   tasks1.push_back(task1);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.launchTasks(offers1.get()[0].id(), tasks1);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   Future<vector<Offer> > offers2;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -758,26 +758,26 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration)
     .WillOnce(FutureSatisfy(&registered));
 
   // Return pending futures from authorizer.
-  Future<Nothing> future1;
+  Future<Nothing> authorize1;
   Promise<bool> promise1;
-  Future<Nothing> future2;
+  Future<Nothing> authorize2;
   Promise<bool> promise2;
   EXPECT_CALL(authorizer, authorize(An<const 
mesos::ACL::RegisterFramework&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future1),
+    .WillOnce(DoAll(FutureSatisfy(&authorize1),
                     Return(promise1.future())))
-    .WillOnce(DoAll(FutureSatisfy(&future2),
+    .WillOnce(DoAll(FutureSatisfy(&authorize2),
                     Return(promise2.future())));
 
   driver.start();
 
   // Wait until first authorization attempt is in progress.
-  AWAIT_READY(future1);
+  AWAIT_READY(authorize1);
 
   // Simulate a spurious leading master change at the scheduler.
   detector.appoint(master.get());
 
   // Wait until second authorization attempt is in progress.
-  AWAIT_READY(future2);
+  AWAIT_READY(authorize2);
 
   // Now complete the first authorization attempt.
   promise1.set(true);
@@ -824,15 +824,15 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
     .WillOnce(FutureSatisfy(&registered));
 
   // Return pending futures from authorizer after the first attempt.
-  Future<Nothing> future2;
+  Future<Nothing> authorize2;
   Promise<bool> promise2;
-  Future<Nothing> future3;
+  Future<Nothing> authorize3;
   Promise<bool> promise3;
   EXPECT_CALL(authorizer, authorize(An<const 
mesos::ACL::RegisterFramework&>()))
     .WillOnce(Return(true))
-    .WillOnce(DoAll(FutureSatisfy(&future2),
+    .WillOnce(DoAll(FutureSatisfy(&authorize2),
                     Return(promise2.future())))
-    .WillOnce(DoAll(FutureSatisfy(&future3),
+    .WillOnce(DoAll(FutureSatisfy(&authorize3),
                     Return(promise3.future())));
 
   driver.start();
@@ -846,13 +846,13 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
   detector.appoint(master.get());
 
   // Wait until the second authorization attempt is in progress.
-  AWAIT_READY(future2);
+  AWAIT_READY(authorize2);
 
   // Simulate another spurious leading master change at the scheduler.
   detector.appoint(master.get());
 
   // Wait until the third authorization attempt is in progress.
-  AWAIT_READY(future3);
+  AWAIT_READY(authorize3);
 
   Future<Nothing> reregistered;
   EXPECT_CALL(sched, reregistered(&driver, _))
@@ -894,16 +894,16 @@ TEST_F(MasterAuthorizationTest, 
FrameworkRemovedBeforeRegistration)
       &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const 
mesos::ACL::RegisterFramework&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   driver.start();
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   // Stop the framework.
   // At this point the framework is disconnected but the master does
@@ -952,11 +952,11 @@ TEST_F(MasterAuthorizationTest, 
FrameworkRemovedBeforeReregistration)
     .WillOnce(FutureSatisfy(&registered));
 
   // Return a pending future from authorizer after first attempt.
-  Future<Nothing> future2;
+  Future<Nothing> authorize2;
   Promise<bool> promise2;
   EXPECT_CALL(authorizer, authorize(An<const 
mesos::ACL::RegisterFramework&>()))
     .WillOnce(Return(true))
-    .WillOnce(DoAll(FutureSatisfy(&future2),
+    .WillOnce(DoAll(FutureSatisfy(&authorize2),
                     Return(promise2.future())));
 
   driver.start();
@@ -974,7 +974,7 @@ TEST_F(MasterAuthorizationTest, 
FrameworkRemovedBeforeReregistration)
   detector.appoint(master.get());
 
   // Wait until the second authorization attempt is in progress.
-  AWAIT_READY(future2);
+  AWAIT_READY(authorize2);
 
   Future<Nothing> frameworkRemoved =
     FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 10a45e3..3dcb2ac 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -185,7 +185,8 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
     const Option<master::Flags>& flags)
 {
   return cluster.masters.start(
-      allocator, flags.isNone() ? CreateMasterFlags() : flags.get());
+      flags.isNone() ? CreateMasterFlags() : flags.get(),
+      allocator);
 }
 
 
@@ -194,7 +195,9 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
     const Option<master::Flags>& flags)
 {
   return cluster.masters.start(
-      authorizer, flags.isNone() ? CreateMasterFlags() : flags.get());
+      flags.isNone() ? CreateMasterFlags() : flags.get(),
+      None(),
+      authorizer);
 }
 
 
@@ -230,7 +233,8 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
     const Option<slave::Flags>& flags)
 {
   return cluster.slaves.start(
-      containerizer, flags.isNone() ? CreateSlaveFlags() : flags.get());
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      containerizer);
 }
 
 
@@ -240,18 +244,33 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
     const Option<slave::Flags>& flags)
 {
   return cluster.slaves.start(
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
       containerizer,
-      detector,
-      flags.isNone() ? CreateSlaveFlags() : flags.get());
+      detector);
+}
+
+
+Try<PID<slave::Slave> > MesosTest::StartSlave(
+    MasterDetector* detector,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
+      detector);
 }
 
 
 Try<PID<slave::Slave> > MesosTest::StartSlave(
     MasterDetector* detector,
+    slave::GarbageCollector* gc,
     const Option<slave::Flags>& flags)
 {
   return cluster.slaves.start(
-      detector, flags.isNone() ? CreateSlaveFlags() : flags.get());
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
+      detector,
+      gc);
 }
 
 
@@ -263,9 +282,9 @@ Try<PID<slave::Slave> > MesosTest::StartSlave(
   slave::Containerizer* containerizer = new TestContainerizer(executor);
 
   Try<process::PID<slave::Slave> > pid = cluster.slaves.start(
-      containerizer,
-      detector,
-      flags.isNone() ? CreateSlaveFlags() : flags.get());
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+          containerizer,
+      detector);
 
   if (pid.isError()) {
     delete containerizer;

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index b31c347..957e223 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -100,6 +100,25 @@ protected:
       Authorizer* authorizer,
       const Option<master::Flags>& flags = None());
 
+  // TODO(bmahler): Consider adding a builder style interface, e.g.
+  //
+  // Try<PID<Slave> > slave =
+  //   Slave().With(flags)
+  //          .With(executor)
+  //          .With(containerizer)
+  //          .With(detector)
+  //          .With(gc)
+  //          .Start();
+  //
+  // Or options:
+  //
+  // Injections injections;
+  // injections.executor = executor;
+  // injections.containerizer = containerizer;
+  // injections.detector = detector;
+  // injections.gc = gc;
+  // Try<PID<Slave> > slave = StartSlave(injections);
+
   // Starts a slave with the specified flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
       const Option<slave::Flags>& flags = None());
@@ -125,6 +144,12 @@ protected:
       MasterDetector* detector,
       const Option<slave::Flags>& flags = None());
 
+  // Starts a slave with the specified MasterDetector, GC, and flags.
+  virtual Try<process::PID<slave::Slave> > StartSlave(
+      MasterDetector* detector,
+      slave::GarbageCollector* gc,
+      const Option<slave::Flags>& flags = None());
+
   // Starts a slave with the specified mock executor, MasterDetector
   // and flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
@@ -480,6 +505,39 @@ public:
 };
 
 
+class MockGarbageCollector : public slave::GarbageCollector
+{
+public:
+  MockGarbageCollector()
+  {
+    using ::testing::_;
+    using ::testing::Return;
+
+    // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
+    // for more details.
+    EXPECT_CALL(*this, schedule(_, _))
+      .WillRepeatedly(Return(Nothing()));
+
+    EXPECT_CALL(*this, unschedule(_))
+      .WillRepeatedly(Return(true));
+
+    EXPECT_CALL(*this, prune(_))
+      .WillRepeatedly(Return());
+  }
+
+  MOCK_METHOD2(
+      schedule,
+      process::Future<Nothing>(const Duration& d, const std::string& path));
+  MOCK_METHOD1(
+      unschedule,
+      process::Future<bool>(const std::string& path));
+  MOCK_METHOD1(
+      prune,
+      void(const Duration& d));
+};
+
+
 // Definition of a MockAuthozier that can be used in tests with gmock.
 class MockAuthorizer : public Authorizer
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1071d3e3/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp 
b/src/tests/reconciliation_tests.cpp
index 8c66659..1c9e73b 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -615,10 +615,10 @@ TEST_F(ReconciliationTest, PendingTask)
   EXPECT_NE(0u, offers.get().size());
 
   // Return a pending future from authorizer.
-  Future<Nothing> future;
+  Future<Nothing> authorize;
   Promise<bool> promise;
   EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
                     Return(promise.future())));
 
   TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
@@ -628,7 +628,7 @@ TEST_F(ReconciliationTest, PendingTask)
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   // Wait until authorization is in progress.
-  AWAIT_READY(future);
+  AWAIT_READY(authorize);
 
   // First send an implicit reconciliation request for this task.
   Future<TaskStatus> update;

Reply via email to