Repository: mesos
Updated Branches:
  refs/heads/master b4938753a -> f511395e8


Serialize isolator prepare and cleanup (reversed).

Review: https://reviews.apache.org/r/25861


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47fa5a11
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47fa5a11
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47fa5a11

Branch: refs/heads/master
Commit: 47fa5a11ba09614c4b780b16da64bf1c276f50ef
Parents: 8e6e36a
Author: Ian Downes <[email protected]>
Authored: Thu Sep 11 23:17:11 2014 -0700
Committer: Ian Downes <[email protected]>
Committed: Mon Oct 27 10:36:36 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/mesos/containerizer.cpp | 202 +++++++++++++------
 src/slave/containerizer/mesos/containerizer.hpp |   4 +-
 2 files changed, 147 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/47fa5a11/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp 
b/src/slave/containerizer/mesos/containerizer.cpp
index 9f745d8..ce92878 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -451,22 +451,51 @@ Future<bool> MesosContainerizerProcess::launch(
       checkpoint);
 }
 
-Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
+
+static list<Option<CommandInfo>> accumulate(
+    list<Option<CommandInfo>> l,
+    const Option<CommandInfo>& e)
+{
+  l.push_back(e);
+  return l;
+}
+
+
+static Future<list<Option<CommandInfo>>> _prepare(
+    const Owned<Isolator>& isolator,
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const list<Option<CommandInfo>> commands)
+{
+  // Propagate any failure.
+  return isolator->prepare(containerId, executorInfo, directory)
+    .then(lambda::bind(&accumulate, commands, lambda::_1));
+}
+
+
+Future<list<Option<CommandInfo>>> MesosContainerizerProcess::prepare(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
     const Option<string>& user)
 {
-  // Start preparing all isolators (in parallel) and gather any additional
-  // preparation comands that must be run in the forked child before exec'ing
-  // the executor.
-  list<Future<Option<CommandInfo> > > futures;
+  // We prepare the isolators sequentially according to their ordering
+  // to permit basic dependency specification, e.g., preparing a
+  // filesystem isolator before other isolators.
+  Future<list<Option<CommandInfo>>> f = list<Option<CommandInfo>>();
+
   foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->prepare(containerId, executorInfo));
+    // Chain together preparing each isolator.
+    f = f.then(lambda::bind(&_prepare,
+                            isolator,
+                            containerId,
+                            executorInfo,
+                            directory,
+                            lambda::_1));
   }
 
-  // Wait for all isolators to complete preparations.
-  return collect(futures);
+  return f;
 }
 
 
@@ -738,6 +767,9 @@ Future<bool> MesosContainerizerProcess::isolate(
   }
 
   // Isolate the executor with each isolator.
+  // NOTE: This is done is parallel and is not sequenced like prepare
+  // or destroy because we assume there are no dependencies in
+  // isolation.
   list<Future<Nothing> > futures;
   foreach (const Owned<Isolator>& isolator, isolators) {
     futures.push_back(isolator->isolate(containerId, _pid));
@@ -753,14 +785,14 @@ Future<bool> MesosContainerizerProcess::exec(
     const ContainerID& containerId,
     int pipeWrite)
 {
-  // The container may be destroyed before we exec the executor so return
-  // failure here.
+  // The container may be destroyed before we exec the executor so
+  // return failure here.
   if (!promises.contains(containerId)) {
     return Failure("Container destroyed during launch");
   }
 
-  // Now that we've contained the child we can signal it to continue by
-  // writing to the pipe.
+  // Now that we've contained the child we can signal it to continue
+  // by writing to the pipe.
   char dummy;
   ssize_t length;
   while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
@@ -790,9 +822,9 @@ Future<Nothing> MesosContainerizerProcess::update(
     const ContainerID& containerId,
     const Resources& _resources)
 {
-  // The resources hashmap won't initially contain the container's resources
-  // after recovery so we must check the promises hashmap (which is set during
-  // recovery).
+  // The resources hashmap won't initially contain the container's
+  // resources after recovery so we must check the promises hashmap
+  // (which is set during recovery).
   if (!promises.contains(containerId)) {
     // It is not considered a failure if the container is not known
     // because the slave will attempt to update the container's
@@ -817,9 +849,9 @@ Future<Nothing> MesosContainerizerProcess::update(
 }
 
 
-// Resources are used to set the limit fields in the statistics but are
-// optional because they aren't known after recovery until/unless update() is
-// called.
+// Resources are used to set the limit fields in the statistics but
+// are optional because they aren't known after recovery until/unless
+// update() is called.
 Future<ResourceStatistics> _usage(
     const ContainerID& containerId,
     const Option<Resources>& resources,
@@ -871,8 +903,8 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage(
   }
 
   // Use await() here so we can return partial usage statistics.
-  // TODO(idownes): After recovery resources won't be known until after an
-  // update() because they aren't part of the SlaveState.
+  // TODO(idownes): After recovery resources won't be known until
+  // after an update() because they aren't part of the SlaveState.
   return await(futures)
     .then(lambda::bind(
           _usage, containerId, resources.get(containerId), lambda::_1));
@@ -899,8 +931,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& 
containerId)
     launcher->destroy(containerId)
       .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
   } else {
-    // The executor never forked so no processes to kill, go straight to
-    // __destroy() with status = None().
+    // The executor never forked so no processes to kill, go straight
+    // to __destroy() with status = None().
     __destroy(containerId, None());
   }
 }
@@ -910,12 +942,12 @@ void MesosContainerizerProcess::_destroy(
     const ContainerID& containerId,
     const Future<Nothing>& future)
 {
-  // Something has gone wrong and the launcher wasn't able to kill all the
-  // processes in the container. We cannot clean up the isolators because they
-  // may require that all processes have exited so just return the failure to
-  // the slave.
-  // TODO(idownes): This is a pretty bad state to be in but we should consider
-  // cleaning up here.
+  // Something has gone wrong and the launcher wasn't able to kill all
+  // the processes in the container. We cannot clean up the isolators
+  // because they may require that all processes have exited so just
+  // return the failure to the slave.
+  // TODO(idownes): This is a pretty bad state to be in but we should
+  // consider cleaning up here.
   if (!future.isReady()) {
     promises[containerId]->fail(
         "Failed to destroy container: " +
@@ -925,53 +957,109 @@ void MesosContainerizerProcess::_destroy(
     return;
   }
 
-  // We've successfully killed all processes in the container so get the exit
-  // status of the executor when it's ready (it may already be) and continue
-  // the destroy.
+  // We've successfully killed all processes in the container so get
+  // the exit status of the executor when it's ready (it may already
+  // be) and continue the destroy.
   statuses.get(containerId).get()
     .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
 }
 
 
+static list<Future<Nothing>> _cleanup(const list<Future<Nothing>>& cleanups)
+{
+  return cleanups;
+}
+
+
+static Future<list<Future<Nothing>>> cleanup(
+    const Owned<Isolator>& isolator,
+    const ContainerID& containerId,
+    list<Future<Nothing>> cleanups)
+{
+  // Accumulate but do not propagate any failure.
+  Future<Nothing> cleanup = isolator->cleanup(containerId);
+  cleanups.push_back(cleanup);
+
+  // Wait for the cleanup to complete/fail before returning the list.
+  // We use await here to asynchronously wait for the isolator to
+  // complete then return cleanups.
+  list<Future<Nothing>> cleanup_;
+  cleanup_.push_back(cleanup);
+
+  return await(cleanup_)
+    .then(lambda::bind(&_cleanup, cleanups));
+}
+
+
+// TODO(idownes): Use a reversed view of the container rather than
+// reversing a copy.
+template <typename T>
+static T reversed(const T& t)
+{
+  T r = t;
+  std::reverse(r.begin(), r.end());
+  return r;
+}
+
+
 void MesosContainerizerProcess::__destroy(
     const ContainerID& containerId,
     const Future<Option<int > >& status)
 {
-  // Now that all processes have exited we can now clean up all isolators.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->cleanup(containerId));
+  // We clean up each isolator in the reverse order they were
+  // prepared (see comment in prepare()).
+  Future<list<Future<Nothing>>> f = list<Future<Nothing>>();
+
+  foreach (const Owned<Isolator>& isolator, reversed(isolators)) {
+    // We'll try to clean up all isolators, waiting for each to
+    // complete and continuing if one fails.
+    f = f.then(lambda::bind(&cleanup,
+                            isolator,
+                            containerId,
+                            lambda::_1));
   }
 
-  // Wait for all isolators to complete cleanup before continuing.
-  collect(futures)
-    .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
+  // Continue destroy when we're done trying to clean up.
+  f.onAny(defer(self(),
+                &Self::___destroy,
+                containerId,
+                status,
+                lambda::_1));
+
+  return;
 }
 
 
 void MesosContainerizerProcess::___destroy(
     const ContainerID& containerId,
-    const Future<Option<int > >& status,
-    const Future<list<Nothing> >& futures)
+    const Future<Option<int>>& status,
+    const Future<list<Future<Nothing>>>& cleanups)
 {
-  // Something has gone wrong with one of the Isolators and cleanup failed.
-  // We'll fail the container termination and remove the 'destroying' flag but
-  // leave all other state. The containerizer is now in a bad state because
-  // at least one isolator has failed to clean up.
-  if (!futures.isReady()) {
-    promises[containerId]->fail(
-        "Failed to clean up isolators when destroying container: " +
-        (futures.isFailed() ? futures.failure() : "discarded future"));
-
-    destroying.erase(containerId);
-
-    return;
+  // This should not occur because we only use the Future<list> to
+  // facilitate chaining.
+  CHECK_READY(cleanups);
+
+  // Check cleanup succeeded for all isolators. If not, we'll fail the
+  // container termination and remove the 'destroying' flag but leave
+  // all other state. The container is now in an inconsistent state.
+  foreach (const Future<Nothing>& cleanup, cleanups.get()) {
+    if (!cleanup.isReady()) {
+      promises[containerId]->fail(
+        "Failed to clean up an isolator when destroying container '" +
+        stringify(containerId) + "' :" +
+        (cleanup.isFailed() ? cleanup.failure() : "discarded future"));
+
+      destroying.erase(containerId);
+
+      return;
+    }
   }
 
   // A container is 'killed' if any isolator limited it.
-  // Note: We may not see a limitation in time for it to be registered. This
-  // could occur if the limitation (e.g., an OOM) killed the executor and we
-  // triggered destroy() off the executor exit.
+  // Note: We may not see a limitation in time for it to be
+  // registered. This could occur if the limitation (e.g., an OOM)
+  // killed the executor and we triggered destroy() off the executor
+  // exit.
   bool killed = false;
   string message;
   if (limitations.contains(containerId)) {
@@ -1028,8 +1116,8 @@ void MesosContainerizerProcess::limited(
               << " and will be terminated";
     limitations.put(containerId, future.get());
   } else {
-    // TODO(idownes): A discarded future will not be an error when isolators
-    // discard their promises after cleanup.
+    // TODO(idownes): A discarded future will not be an error when
+    // isolators discard their promises after cleanup.
     LOG(ERROR) << "Error in a resource limitation for container "
                << containerId << ": " << (future.isFailed() ? future.failure()
                                                             : "discarded");

http://git-wip-us.apache.org/repos/asf/mesos/blob/47fa5a11/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp 
b/src/slave/containerizer/mesos/containerizer.hpp
index bf246ca..ab3bb6f 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -193,8 +193,8 @@ private:
   // cleanup.
   void ___destroy(
       const ContainerID& containerId,
-      const process::Future<Option<int > >& status,
-      const process::Future<std::list<Nothing> >& futures);
+      const process::Future<Option<int> >& status,
+      const process::Future<std::list<process::Future<Nothing>>>& cleanups);
 
   // Call back for when an isolator limits a container and impacts the
   // processes. This will trigger container destruction.

Reply via email to