Repository: mesos Updated Branches: refs/heads/master b4938753a -> f511395e8
Serialize isolator prepare and cleanup (reversed). Review: https://reviews.apache.org/r/25861 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47fa5a11 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47fa5a11 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47fa5a11 Branch: refs/heads/master Commit: 47fa5a11ba09614c4b780b16da64bf1c276f50ef Parents: 8e6e36a Author: Ian Downes <[email protected]> Authored: Thu Sep 11 23:17:11 2014 -0700 Committer: Ian Downes <[email protected]> Committed: Mon Oct 27 10:36:36 2014 -0700 ---------------------------------------------------------------------- src/slave/containerizer/mesos/containerizer.cpp | 202 +++++++++++++------ src/slave/containerizer/mesos/containerizer.hpp | 4 +- 2 files changed, 147 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/47fa5a11/src/slave/containerizer/mesos/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp index 9f745d8..ce92878 100644 --- a/src/slave/containerizer/mesos/containerizer.cpp +++ b/src/slave/containerizer/mesos/containerizer.cpp @@ -451,22 +451,51 @@ Future<bool> MesosContainerizerProcess::launch( checkpoint); } -Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare( + +static list<Option<CommandInfo>> accumulate( + list<Option<CommandInfo>> l, + const Option<CommandInfo>& e) +{ + l.push_back(e); + return l; +} + + +static Future<list<Option<CommandInfo>>> _prepare( + const Owned<Isolator>& isolator, + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const list<Option<CommandInfo>> commands) +{ + // Propagate any failure. + return isolator->prepare(containerId, executorInfo, directory) + .then(lambda::bind(&accumulate, commands, lambda::_1)); +} + + +Future<list<Option<CommandInfo>>> MesosContainerizerProcess::prepare( const ContainerID& containerId, const ExecutorInfo& executorInfo, const string& directory, const Option<string>& user) { - // Start preparing all isolators (in parallel) and gather any additional - // preparation comands that must be run in the forked child before exec'ing - // the executor. - list<Future<Option<CommandInfo> > > futures; + // We prepare the isolators sequentially according to their ordering + // to permit basic dependency specification, e.g., preparing a + // filesystem isolator before other isolators. + Future<list<Option<CommandInfo>>> f = list<Option<CommandInfo>>(); + foreach (const Owned<Isolator>& isolator, isolators) { - futures.push_back(isolator->prepare(containerId, executorInfo)); + // Chain together preparing each isolator. + f = f.then(lambda::bind(&_prepare, + isolator, + containerId, + executorInfo, + directory, + lambda::_1)); } - // Wait for all isolators to complete preparations. - return collect(futures); + return f; } @@ -738,6 +767,9 @@ Future<bool> MesosContainerizerProcess::isolate( } // Isolate the executor with each isolator. + // NOTE: This is done is parallel and is not sequenced like prepare + // or destroy because we assume there are no dependencies in + // isolation. list<Future<Nothing> > futures; foreach (const Owned<Isolator>& isolator, isolators) { futures.push_back(isolator->isolate(containerId, _pid)); @@ -753,14 +785,14 @@ Future<bool> MesosContainerizerProcess::exec( const ContainerID& containerId, int pipeWrite) { - // The container may be destroyed before we exec the executor so return - // failure here. + // The container may be destroyed before we exec the executor so + // return failure here. if (!promises.contains(containerId)) { return Failure("Container destroyed during launch"); } - // Now that we've contained the child we can signal it to continue by - // writing to the pipe. + // Now that we've contained the child we can signal it to continue + // by writing to the pipe. char dummy; ssize_t length; while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 && @@ -790,9 +822,9 @@ Future<Nothing> MesosContainerizerProcess::update( const ContainerID& containerId, const Resources& _resources) { - // The resources hashmap won't initially contain the container's resources - // after recovery so we must check the promises hashmap (which is set during - // recovery). + // The resources hashmap won't initially contain the container's + // resources after recovery so we must check the promises hashmap + // (which is set during recovery). if (!promises.contains(containerId)) { // It is not considered a failure if the container is not known // because the slave will attempt to update the container's @@ -817,9 +849,9 @@ Future<Nothing> MesosContainerizerProcess::update( } -// Resources are used to set the limit fields in the statistics but are -// optional because they aren't known after recovery until/unless update() is -// called. +// Resources are used to set the limit fields in the statistics but +// are optional because they aren't known after recovery until/unless +// update() is called. Future<ResourceStatistics> _usage( const ContainerID& containerId, const Option<Resources>& resources, @@ -871,8 +903,8 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage( } // Use await() here so we can return partial usage statistics. - // TODO(idownes): After recovery resources won't be known until after an - // update() because they aren't part of the SlaveState. + // TODO(idownes): After recovery resources won't be known until + // after an update() because they aren't part of the SlaveState. return await(futures) .then(lambda::bind( _usage, containerId, resources.get(containerId), lambda::_1)); @@ -899,8 +931,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& containerId) launcher->destroy(containerId) .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1)); } else { - // The executor never forked so no processes to kill, go straight to - // __destroy() with status = None(). + // The executor never forked so no processes to kill, go straight + // to __destroy() with status = None(). __destroy(containerId, None()); } } @@ -910,12 +942,12 @@ void MesosContainerizerProcess::_destroy( const ContainerID& containerId, const Future<Nothing>& future) { - // Something has gone wrong and the launcher wasn't able to kill all the - // processes in the container. We cannot clean up the isolators because they - // may require that all processes have exited so just return the failure to - // the slave. - // TODO(idownes): This is a pretty bad state to be in but we should consider - // cleaning up here. + // Something has gone wrong and the launcher wasn't able to kill all + // the processes in the container. We cannot clean up the isolators + // because they may require that all processes have exited so just + // return the failure to the slave. + // TODO(idownes): This is a pretty bad state to be in but we should + // consider cleaning up here. if (!future.isReady()) { promises[containerId]->fail( "Failed to destroy container: " + @@ -925,53 +957,109 @@ void MesosContainerizerProcess::_destroy( return; } - // We've successfully killed all processes in the container so get the exit - // status of the executor when it's ready (it may already be) and continue - // the destroy. + // We've successfully killed all processes in the container so get + // the exit status of the executor when it's ready (it may already + // be) and continue the destroy. statuses.get(containerId).get() .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1)); } +static list<Future<Nothing>> _cleanup(const list<Future<Nothing>>& cleanups) +{ + return cleanups; +} + + +static Future<list<Future<Nothing>>> cleanup( + const Owned<Isolator>& isolator, + const ContainerID& containerId, + list<Future<Nothing>> cleanups) +{ + // Accumulate but do not propagate any failure. + Future<Nothing> cleanup = isolator->cleanup(containerId); + cleanups.push_back(cleanup); + + // Wait for the cleanup to complete/fail before returning the list. + // We use await here to asynchronously wait for the isolator to + // complete then return cleanups. + list<Future<Nothing>> cleanup_; + cleanup_.push_back(cleanup); + + return await(cleanup_) + .then(lambda::bind(&_cleanup, cleanups)); +} + + +// TODO(idownes): Use a reversed view of the container rather than +// reversing a copy. +template <typename T> +static T reversed(const T& t) +{ + T r = t; + std::reverse(r.begin(), r.end()); + return r; +} + + void MesosContainerizerProcess::__destroy( const ContainerID& containerId, const Future<Option<int > >& status) { - // Now that all processes have exited we can now clean up all isolators. - list<Future<Nothing> > futures; - foreach (const Owned<Isolator>& isolator, isolators) { - futures.push_back(isolator->cleanup(containerId)); + // We clean up each isolator in the reverse order they were + // prepared (see comment in prepare()). + Future<list<Future<Nothing>>> f = list<Future<Nothing>>(); + + foreach (const Owned<Isolator>& isolator, reversed(isolators)) { + // We'll try to clean up all isolators, waiting for each to + // complete and continuing if one fails. + f = f.then(lambda::bind(&cleanup, + isolator, + containerId, + lambda::_1)); } - // Wait for all isolators to complete cleanup before continuing. - collect(futures) - .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1)); + // Continue destroy when we're done trying to clean up. + f.onAny(defer(self(), + &Self::___destroy, + containerId, + status, + lambda::_1)); + + return; } void MesosContainerizerProcess::___destroy( const ContainerID& containerId, - const Future<Option<int > >& status, - const Future<list<Nothing> >& futures) + const Future<Option<int>>& status, + const Future<list<Future<Nothing>>>& cleanups) { - // Something has gone wrong with one of the Isolators and cleanup failed. - // We'll fail the container termination and remove the 'destroying' flag but - // leave all other state. The containerizer is now in a bad state because - // at least one isolator has failed to clean up. - if (!futures.isReady()) { - promises[containerId]->fail( - "Failed to clean up isolators when destroying container: " + - (futures.isFailed() ? futures.failure() : "discarded future")); - - destroying.erase(containerId); - - return; + // This should not occur because we only use the Future<list> to + // facilitate chaining. + CHECK_READY(cleanups); + + // Check cleanup succeeded for all isolators. If not, we'll fail the + // container termination and remove the 'destroying' flag but leave + // all other state. The container is now in an inconsistent state. + foreach (const Future<Nothing>& cleanup, cleanups.get()) { + if (!cleanup.isReady()) { + promises[containerId]->fail( + "Failed to clean up an isolator when destroying container '" + + stringify(containerId) + "' :" + + (cleanup.isFailed() ? cleanup.failure() : "discarded future")); + + destroying.erase(containerId); + + return; + } } // A container is 'killed' if any isolator limited it. - // Note: We may not see a limitation in time for it to be registered. This - // could occur if the limitation (e.g., an OOM) killed the executor and we - // triggered destroy() off the executor exit. + // Note: We may not see a limitation in time for it to be + // registered. This could occur if the limitation (e.g., an OOM) + // killed the executor and we triggered destroy() off the executor + // exit. bool killed = false; string message; if (limitations.contains(containerId)) { @@ -1028,8 +1116,8 @@ void MesosContainerizerProcess::limited( << " and will be terminated"; limitations.put(containerId, future.get()); } else { - // TODO(idownes): A discarded future will not be an error when isolators - // discard their promises after cleanup. + // TODO(idownes): A discarded future will not be an error when + // isolators discard their promises after cleanup. LOG(ERROR) << "Error in a resource limitation for container " << containerId << ": " << (future.isFailed() ? future.failure() : "discarded"); http://git-wip-us.apache.org/repos/asf/mesos/blob/47fa5a11/src/slave/containerizer/mesos/containerizer.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp index bf246ca..ab3bb6f 100644 --- a/src/slave/containerizer/mesos/containerizer.hpp +++ b/src/slave/containerizer/mesos/containerizer.hpp @@ -193,8 +193,8 @@ private: // cleanup. void ___destroy( const ContainerID& containerId, - const process::Future<Option<int > >& status, - const process::Future<std::list<Nothing> >& futures); + const process::Future<Option<int> >& status, + const process::Future<std::list<process::Future<Nothing>>>& cleanups); // Call back for when an isolator limits a container and impacts the // processes. This will trigger container destruction.
