Repository: mesos Updated Branches: refs/heads/master 75bf214e6 -> 25489e53e
Allow mesos containerizer to destroy correctly at each state. Review: https://reviews.apache.org/r/28141 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25489e53 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25489e53 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25489e53 Branch: refs/heads/master Commit: 25489e53e9f308c5fca3d0293aeceb716b53149d Parents: 75bf214 Author: Timothy Chen <[email protected]> Authored: Sat Oct 25 18:52:21 2014 -0700 Committer: Timothy Chen <[email protected]> Committed: Fri Nov 21 15:31:42 2014 -0800 ---------------------------------------------------------------------- src/slave/containerizer/mesos/containerizer.cpp | 231 +++++++++++++------ src/slave/containerizer/mesos/containerizer.hpp | 96 +++++--- src/tests/containerizer_tests.cpp | 93 ++++++++ 3 files changed, 312 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp index 24f90b6..55c2922 100644 --- a/src/slave/containerizer/mesos/containerizer.cpp +++ b/src/slave/containerizer/mesos/containerizer.cpp @@ -173,25 +173,31 @@ MesosContainerizer::MesosContainerizer( bool local, const Owned<Launcher>& launcher, const vector<Owned<Isolator>>& isolators) + : process(new MesosContainerizerProcess(flags, local, launcher, isolators)) { - process = new MesosContainerizerProcess( - flags, local, launcher, isolators); - spawn(process); + spawn(process.get()); +} + + +MesosContainerizer::MesosContainerizer( + const Owned<MesosContainerizerProcess>& _process) + : process(_process) +{ + spawn(process.get()); } MesosContainerizer::~MesosContainerizer() { - terminate(process); - process::wait(process); - delete process; + terminate(process.get()); + process::wait(process.get()); } Future<Nothing> MesosContainerizer::recover( const Option<state::SlaveState>& state) { - return dispatch(process, &MesosContainerizerProcess::recover, state); + return dispatch(process.get(), &MesosContainerizerProcess::recover, state); } @@ -204,7 +210,7 @@ Future<bool> MesosContainerizer::launch( const PID<Slave>& slavePid, bool checkpoint) { - return dispatch(process, + return dispatch(process.get(), &MesosContainerizerProcess::launch, containerId, executorInfo, @@ -226,7 +232,7 @@ Future<bool> MesosContainerizer::launch( const PID<Slave>& slavePid, bool checkpoint) { - return dispatch(process, + return dispatch(process.get(), &MesosContainerizerProcess::launch, containerId, taskInfo, @@ -243,7 +249,7 @@ Future<Nothing> MesosContainerizer::update( const ContainerID& containerId, const Resources& resources) { - return dispatch(process, + return dispatch(process.get(), &MesosContainerizerProcess::update, containerId, resources); @@ -253,26 +259,26 @@ Future<Nothing> MesosContainerizer::update( Future<ResourceStatistics> MesosContainerizer::usage( const ContainerID& containerId) { - return dispatch(process, &MesosContainerizerProcess::usage, containerId); + return dispatch(process.get(), &MesosContainerizerProcess::usage, containerId); } Future<containerizer::Termination> MesosContainerizer::wait( const ContainerID& containerId) { - return dispatch(process, &MesosContainerizerProcess::wait, containerId); + return dispatch(process.get(), &MesosContainerizerProcess::wait, containerId); } void MesosContainerizer::destroy(const ContainerID& containerId) { - dispatch(process, &MesosContainerizerProcess::destroy, containerId); + dispatch(process.get(), &MesosContainerizerProcess::destroy, containerId); } Future<hashset<ContainerID>> MesosContainerizer::containers() { - return dispatch(process, &MesosContainerizerProcess::containers); + return dispatch(process.get(), &MesosContainerizerProcess::containers); } @@ -356,16 +362,20 @@ Future<Nothing> MesosContainerizerProcess::__recover( { foreach (const RunState& run, recovered) { CHECK_SOME(run.id); - const ContainerID& containerId = run.id.get(); + CHECK_SOME(run.forkedPid); - Owned<Promise<containerizer::Termination>> promise( - new Promise<containerizer::Termination>()); - promises.put(containerId, promise); + const ContainerID& containerId = run.id.get(); - CHECK_SOME(run.forkedPid); + Container* container = new Container(); Future<Option<int>> status = process::reap(run.forkedPid.get()); - statuses[containerId] = status; status.onAny(defer(self(), &Self::reaped, containerId)); + container->status = status; + // We only checkpoint the containerizer pid after the container + // successfully launched, therefore we can assume checkpointed + // containers should be running after recover. + container->state = RUNNING; + + containers_[containerId] = Owned<Container>(container); foreach (const Owned<Isolator>& isolator, isolators) { isolator->watch(containerId) @@ -395,7 +405,7 @@ Future<bool> MesosContainerizerProcess::launch( const PID<Slave>& slavePid, bool checkpoint) { - if (promises.contains(containerId)) { + if (containers_.contains(containerId)) { LOG(ERROR) << "Cannot start already running container '" << containerId << "'"; return Failure("Container already started"); @@ -415,12 +425,10 @@ Future<bool> MesosContainerizerProcess::launch( return false; } - Owned<Promise<containerizer::Termination>> promise( - new Promise<containerizer::Termination>()); - promises.put(containerId, promise); - - // Store the resources for usage(). - resources.put(containerId, executorInfo.resources()); + Container* container = new Container(); + container->resources = executorInfo.resources(); + container->state = PREPARING; + containers_[containerId] = Owned<Container>(container); LOG(INFO) << "Starting container '" << containerId << "' for executor '" << executorInfo.executor_id() @@ -525,6 +533,14 @@ Future<Nothing> MesosContainerizerProcess::fetch( const string& directory, const Option<string>& user) { + if (!containers_.contains(containerId)) { + return Failure("Container is already destroyed"); + } + + if (commandInfo.uris().size() == 0) { + return Nothing(); + } + Try<Subprocess> fetcher = fetcher::run( commandInfo, directory, @@ -535,6 +551,13 @@ Future<Nothing> MesosContainerizerProcess::fetch( return Failure("Failed to execute mesos-fetcher: " + fetcher.error()); } + // TODO(tnachen): Currently the fetcher won't shutdown when slave + // exits. This means the fetcher will still be running when slave + // restarts and after recovering. We won't resume the task since + // it hasn't checkpointed yet. Once the fetcher supports existing + // on slave it will be removed automatically. + containers_[containerId]->fetcher = fetcher.get(); + return fetcher.get().status() .then(lambda::bind(&fetcher::_run, containerId, lambda::_1)); } @@ -550,6 +573,14 @@ Future<bool> MesosContainerizerProcess::_launch( bool checkpoint, const list<Option<CommandInfo>>& commands) { + if (!containers_.contains(containerId)) { + return Failure("Container has been destroyed"); + } + + if (containers_[containerId]->state == DESTROYING) { + return Failure("Container is currently being destroyed"); + } + // Prepare environment variables for the executor. map<string, string> env = executorEnvironment( executorInfo, @@ -643,8 +674,8 @@ Future<bool> MesosContainerizerProcess::_launch( // Monitor the executor's pid. We keep the future because we'll // refer to it again during container destroy. Future<Option<int>> status = process::reap(pid); - statuses.put(containerId, status); status.onAny(defer(self(), &Self::reaped, containerId)); + containers_[containerId]->status = status; return isolate(containerId, pid) .then(defer(self(), @@ -669,6 +700,10 @@ Future<bool> MesosContainerizerProcess::isolate( const ContainerID& containerId, pid_t _pid) { + CHECK(containers_.contains(containerId)); + + containers_[containerId]->state = ISOLATING; + // Set up callbacks for isolator limitations. foreach (const Owned<Isolator>& isolator, isolators) { isolator->watch(containerId) @@ -685,8 +720,11 @@ Future<bool> MesosContainerizerProcess::isolate( } // Wait for all isolators to complete. - return collect(futures) - .then(lambda::bind(&_isolate)); + Future<list<Nothing>> future = collect(futures); + + containers_[containerId]->isolation = future; + + return future.then(lambda::bind(&_isolate)); } @@ -696,7 +734,8 @@ Future<bool> MesosContainerizerProcess::exec( { // The container may be destroyed before we exec the executor so // return failure here. - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId) || + containers_[containerId]->state == DESTROYING) { return Failure("Container destroyed during launch"); } @@ -712,6 +751,8 @@ Future<bool> MesosContainerizerProcess::exec( string(strerror(errno))); } + containers_[containerId]->state = RUNNING; + return true; } @@ -719,11 +760,11 @@ Future<bool> MesosContainerizerProcess::exec( Future<containerizer::Termination> MesosContainerizerProcess::wait( const ContainerID& containerId) { - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId)) { return Failure("Unknown container: " + stringify(containerId)); } - return promises[containerId]->future(); + return containers_[containerId]->promise.future(); } @@ -731,10 +772,7 @@ Future<Nothing> MesosContainerizerProcess::update( const ContainerID& containerId, const Resources& _resources) { - // The resources hashmap won't initially contain the container's - // resources after recovery so we must check the promises hashmap - // (which is set during recovery). - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId)) { // It is not considered a failure if the container is not known // because the slave will attempt to update the container's // resources on a task's terminal state change but the executor @@ -743,8 +781,14 @@ Future<Nothing> MesosContainerizerProcess::update( return Nothing(); } + if (containers_[containerId]->state == DESTROYING) { + LOG(WARNING) << "Ignoring update for currently being destroyed container: " + << containerId; + return Nothing(); + } + // Store the resources for usage(). - resources.put(containerId, _resources); + containers_[containerId]->resources = _resources; // Update each isolator. list<Future<Nothing>> futures; @@ -802,7 +846,7 @@ Future<ResourceStatistics> _usage( Future<ResourceStatistics> MesosContainerizerProcess::usage( const ContainerID& containerId) { - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId)) { return Failure("Unknown container: " + stringify(containerId)); } @@ -816,41 +860,81 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage( // after an update() because they aren't part of the SlaveState. return await(futures) .then(lambda::bind( - _usage, containerId, resources.get(containerId), lambda::_1)); + _usage, + containerId, + containers_[containerId]->resources, + lambda::_1)); } void MesosContainerizerProcess::destroy(const ContainerID& containerId) { - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId)) { LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId; return; } - if (destroying.contains(containerId)) { + Container* container = containers_[containerId].get(); + + if (container->state == DESTROYING) { // Destroy has already been initiated. return; } - destroying.insert(containerId); LOG(INFO) << "Destroying container '" << containerId << "'"; - if (statuses.contains(containerId)) { - // Kill all processes then continue destruction. - launcher->destroy(containerId) - .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1)); - } else { - // The executor never forked so no processes to kill, go straight - // to __destroy() with status = None(). - __destroy(containerId, None()); + if (container->state == PREPARING) { + // We cannot simply terminate the container if it's preparing + // since isolator's prepare doesn't need any cleanup. + containerizer::Termination termination; + termination.set_killed(true); + termination.set_message("Container destroyed while preparing isolators"); + container->promise.set(termination); + + containers_.erase(containerId); + + return; + } + + if (container->state == FETCHING && container->fetcher.isSome()) { + VLOG(1) << "Killing the fetcher for container '" << containerId << "'"; + // Best effort kill the entire fetcher tree. + os::killtree(container->fetcher.get().pid(), SIGKILL); + } + + if (container->state == ISOLATING) { + VLOG(1) << "Waiting for the isolators to complete for container '" + << containerId << "'"; + + container->state = DESTROYING; + + // Wait for the isolators to finish isolating before we start + // to destroy the container. + container->isolation + .onAny(defer(self(), &Self::_destroy, containerId)); + + return; } + + container->state = DESTROYING; + _destroy(containerId); } -void MesosContainerizerProcess::_destroy( +void MesosContainerizerProcess::_destroy(const ContainerID& containerId) +{ + // Kill all processes then continue destruction. + launcher->destroy(containerId) + .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1)); +} + + +void MesosContainerizerProcess::__destroy( const ContainerID& containerId, const Future<Nothing>& future) { + CHECK(containers_.contains(containerId)); + // Something has gone wrong and the launcher wasn't able to kill all // the processes in the container. We cannot clean up the isolators // because they may require that all processes have exited so just @@ -858,19 +942,20 @@ void MesosContainerizerProcess::_destroy( // TODO(idownes): This is a pretty bad state to be in but we should // consider cleaning up here. if (!future.isReady()) { - promises[containerId]->fail( + containers_[containerId]->promise.fail( "Failed to destroy container " + stringify(containerId) + ": " + (future.isFailed() ? future.failure() : "discarded future")); - destroying.erase(containerId); + containers_.erase(containerId); + return; } // We've successfully killed all processes in the container so get // the exit status of the executor when it's ready (it may already // be) and continue the destroy. - statuses.get(containerId).get() - .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1)); + containers_[containerId]->status + .onAny(defer(self(), &Self::___destroy, containerId, lambda::_1)); } @@ -911,7 +996,7 @@ static T reversed(const T& t) } -void MesosContainerizerProcess::__destroy( +void MesosContainerizerProcess::___destroy( const ContainerID& containerId, const Future<Option<int>>& status) { @@ -930,7 +1015,7 @@ void MesosContainerizerProcess::__destroy( // Continue destroy when we're done trying to clean up. f.onAny(defer(self(), - &Self::___destroy, + &Self::____destroy, containerId, status, lambda::_1)); @@ -939,7 +1024,7 @@ void MesosContainerizerProcess::__destroy( } -void MesosContainerizerProcess::___destroy( +void MesosContainerizerProcess::____destroy( const ContainerID& containerId, const Future<Option<int>>& status, const Future<list<Future<Nothing>>>& cleanups) @@ -947,18 +1032,21 @@ void MesosContainerizerProcess::___destroy( // This should not occur because we only use the Future<list> to // facilitate chaining. CHECK_READY(cleanups); + CHECK(containers_.contains(containerId)); + + Container* container = containers_[containerId].get(); // Check cleanup succeeded for all isolators. If not, we'll fail the // container termination and remove the 'destroying' flag but leave // all other state. The container is now in an inconsistent state. foreach (const Future<Nothing>& cleanup, cleanups.get()) { if (!cleanup.isReady()) { - promises[containerId]->fail( + container->promise.fail( "Failed to clean up an isolator when destroying container '" + stringify(containerId) + "' :" + (cleanup.isFailed() ? cleanup.failure() : "discarded future")); - destroying.erase(containerId); + containers_.erase(containerId); return; } @@ -971,9 +1059,9 @@ void MesosContainerizerProcess::___destroy( // exit. bool killed = false; string message; - if (limitations.contains(containerId)) { + if (container->limitations.size() > 0) { killed = true; - foreach (const Limitation& limitation, limitations.get(containerId)) { + foreach (const Limitation& limitation, container->limitations) { message += limitation.message; } message = strings::trim(message); @@ -988,19 +1076,15 @@ void MesosContainerizerProcess::___destroy( termination.set_status(status.get().get()); } - promises[containerId]->set(termination); + container->promise.set(termination); - promises.erase(containerId); - statuses.erase(containerId); - limitations.erase(containerId); - resources.erase(containerId); - destroying.erase(containerId); + containers_.erase(containerId); } void MesosContainerizerProcess::reaped(const ContainerID& containerId) { - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId)) { return; } @@ -1015,7 +1099,8 @@ void MesosContainerizerProcess::limited( const ContainerID& containerId, const Future<Limitation>& future) { - if (!promises.contains(containerId)) { + if (!containers_.contains(containerId) || + containers_[containerId]->state == DESTROYING) { return; } @@ -1023,7 +1108,7 @@ void MesosContainerizerProcess::limited( LOG(INFO) << "Container " << containerId << " has reached its limit for" << " resource " << future.get().resource << " and will be terminated"; - limitations.put(containerId, future.get()); + containers_[containerId]->limitations.push_back(future.get()); } else { // TODO(idownes): A discarded future will not be an error when // isolators discard their promises after cleanup. @@ -1039,7 +1124,7 @@ void MesosContainerizerProcess::limited( Future<hashset<ContainerID>> MesosContainerizerProcess::containers() { - return promises.keys(); + return containers_.keys(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp index 3baea31..0b635d4 100644 --- a/src/slave/containerizer/mesos/containerizer.hpp +++ b/src/slave/containerizer/mesos/containerizer.hpp @@ -47,6 +47,9 @@ public: const process::Owned<Launcher>& launcher, const std::vector<process::Owned<Isolator>>& isolators); + // Used for testing. + MesosContainerizer(const process::Owned<MesosContainerizerProcess>& _process); + virtual ~MesosContainerizer(); virtual process::Future<Nothing> recover( @@ -86,7 +89,7 @@ public: virtual process::Future<hashset<ContainerID>> containers(); private: - MesosContainerizerProcess* process; + process::Owned<MesosContainerizerProcess> process; }; @@ -106,10 +109,10 @@ public: virtual ~MesosContainerizerProcess() {} - process::Future<Nothing> recover( + virtual process::Future<Nothing> recover( const Option<state::SlaveState>& state); - process::Future<bool> launch( + virtual process::Future<bool> launch( const ContainerID& containerId, const ExecutorInfo& executorInfo, const std::string& directory, @@ -118,7 +121,7 @@ public: const process::PID<Slave>& slavePid, bool checkpoint); - process::Future<bool> launch( + virtual process::Future<bool> launch( const ContainerID& containerId, const TaskInfo& taskInfo, const ExecutorInfo& executorInfo, @@ -128,19 +131,23 @@ public: const process::PID<Slave>& slavePid, bool checkpoint); - process::Future<Nothing> update( + virtual process::Future<Nothing> update( const ContainerID& containerId, const Resources& resources); - process::Future<ResourceStatistics> usage( + virtual process::Future<ResourceStatistics> usage( const ContainerID& containerId); - process::Future<containerizer::Termination> wait( + virtual process::Future<containerizer::Termination> wait( const ContainerID& containerId); - void destroy(const ContainerID& containerId); + virtual process::Future<bool> exec( + const ContainerID& containerId, + int pipeWrite); + + virtual void destroy(const ContainerID& containerId); - process::Future<hashset<ContainerID>> containers(); + virtual process::Future<hashset<ContainerID>> containers(); private: process::Future<Nothing> _recover( @@ -175,23 +182,22 @@ private: const ContainerID& containerId, pid_t _pid); - process::Future<bool> exec( - const ContainerID& containerId, - int pipeWrite); + // Continues 'destroy()' once isolators has completed. + void _destroy(const ContainerID& containerId); // Continues 'destroy()' once all processes have been killed by the launcher. - void _destroy( + void __destroy( const ContainerID& containerId, const process::Future<Nothing>& future); // Continues '_destroy()' once we get the exit status of the executor. - void __destroy( + void ___destroy( const ContainerID& containerId, const process::Future<Option<int>>& status); // Continues (and completes) '__destroy()' once all isolators have completed // cleanup. - void ___destroy( + void ____destroy( const ContainerID& containerId, const process::Future<Option<int>>& status, const process::Future<std::list<process::Future<Nothing>>>& cleanups); @@ -211,26 +217,46 @@ private: const process::Owned<Launcher> launcher; const std::vector<process::Owned<Isolator>> isolators; - // TODO(idownes): Consider putting these per-container variables into a - // struct. - // Promises for futures returned from wait(). - hashmap<ContainerID, - process::Owned<process::Promise<containerizer::Termination>>> promises; - - // We need to keep track of the future exit status for each executor because - // we'll only get a single notification when the executor exits. - hashmap<ContainerID, process::Future<Option<int>>> statuses; - - // We keep track of any limitations received from each isolator so we can - // determine the cause of an executor termination. - multihashmap<ContainerID, Limitation> limitations; - - // We keep track of the resources for each container so we can set the - // ResourceStatistics limits in usage(). - hashmap<ContainerID, Resources> resources; - - // Set of containers that are in process of being destroyed. - hashset<ContainerID> destroying; + enum State + { + PREPARING, + ISOLATING, + FETCHING, + RUNNING, + DESTROYING + }; + + struct Container + { + // Promise for futures returned from wait(). + process::Promise<containerizer::Termination> promise; + + // We need to keep track of the future exit status for each + // executor because we'll only get a single notification when + // the executor exits. + process::Future<Option<int>> status; + + // We keep track of the future that is waiting for all the + // isolator's futures, so that destroy will only start calling + // cleanup after all isolators has finished isolating. + process::Future<std::list<Nothing>> isolation; + + // We keep track of any limitations received from each isolator so we can + // determine the cause of an executor termination. + std::vector<Limitation> limitations; + + // The mesos-fetcher subprocess, that we keep around so we can + // stop the fetcher when the container is destroyed. + Option<process::Subprocess> fetcher; + + // We keep track of the resources for each container so we can set the + // ResourceStatistics limits in usage(). + Resources resources; + + State state; + }; + + hashmap<ContainerID, process::Owned<Container>> containers_; }; } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/tests/containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp index a63897b..02a5f15 100644 --- a/src/tests/containerizer_tests.cpp +++ b/src/tests/containerizer_tests.cpp @@ -49,6 +49,11 @@ using std::map; using std::string; using std::vector; +using testing::_; +using testing::DoAll; +using testing::Invoke; +using testing::Return; + class MesosContainerizerIsolatorPreparationTest : public tests::TemporaryDirectoryTest { @@ -291,3 +296,91 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection) delete containerizer.get(); } + + +class MesosContainerizerDestroyTest : public tests::TemporaryDirectoryTest {}; + +class MockMesosContainerizerProcess : public MesosContainerizerProcess +{ +public: + MockMesosContainerizerProcess( + const Flags& flags, + bool local, + const process::Owned<Launcher>& launcher, + const std::vector<process::Owned<Isolator>>& isolators) + : MesosContainerizerProcess(flags, local, launcher, isolators) + { + // NOTE: See TestContainerizer::setup for why we use + // 'EXPECT_CALL' and 'WillRepeatedly' here instead of + // 'ON_CALL' and 'WillByDefault'. + EXPECT_CALL(*this, exec(_, _)) + .WillRepeatedly(Invoke(this, &MockMesosContainerizerProcess::_exec)); + } + + MOCK_METHOD2( + exec, + process::Future<bool>( + const ContainerID& containerId, + int pipeWrite)); + + process::Future<bool> _exec( + const ContainerID& containerId, + int pipeWrite) + { + return MesosContainerizerProcess::exec( + containerId, + pipeWrite); + } +}; + + +// Destroying a mesos containerizer while it is fetching should +// complete without waiting for the fetching to finish. +TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching) +{ + slave::Flags flags; + Try<Launcher*> launcher = PosixLauncher::create(flags); + ASSERT_SOME(launcher); + std::vector<process::Owned<Isolator>> isolators; + + MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess( + flags, + true, + Owned<Launcher>(launcher.get()), + isolators); + + Future<Nothing> exec; + Promise<bool> promise; + // Letting exec hang to simulate a long fetch. + EXPECT_CALL(*process, exec(_, _)) + .WillOnce(DoAll(FutureSatisfy(&exec), + Return(promise.future()))); + + MesosContainerizer containerizer((Owned<MesosContainerizerProcess>(process))); + + ContainerID containerId; + containerId.set_value("test_container"); + + TaskInfo taskInfo; + CommandInfo commandInfo; + taskInfo.mutable_command()->MergeFrom(commandInfo); + + containerizer.launch( + containerId, + taskInfo, + CREATE_EXECUTOR_INFO("executor", "exit 0"), + os::getcwd(), + None(), + SlaveID(), + process::PID<Slave>(), + false); + + Future<containerizer::Termination> wait = containerizer.wait(containerId); + + AWAIT_READY(exec); + + containerizer.destroy(containerId); + + // The container should still exit even if fetch didn't complete. + AWAIT_READY(wait); +}
