Even more simplification of launch paths in Docker containerizer. Review: https://reviews.apache.org/r/26615
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ce1fc54 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ce1fc54 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ce1fc54 Branch: refs/heads/master Commit: 9ce1fc5486b341782b01698486b3a3d527c57c3c Parents: 4cbcac4 Author: Benjamin Hindman <[email protected]> Authored: Sat Oct 11 14:42:20 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Fri Oct 31 15:05:39 2014 -0700 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 218 +++++++++++++------------------- 1 file changed, 85 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9ce1fc54/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index b211db0..73e3c40 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -139,6 +139,10 @@ private: process::Future<Nothing> _pull(const std::string& image); + Try<Nothing> checkpoint( + const ContainerID& containerId, + pid_t pid); + process::Future<Nothing> _recover( const std::list<Docker::Container>& containers); @@ -151,29 +155,13 @@ private: const std::string& directory); process::Future<bool> ___launch( - const ContainerID& containerId, - const TaskInfo& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint); - - process::Future<bool> ___launch( - const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint); + const ContainerID& containerId); process::Future<bool> ____launch( + const ContainerID& containerId); + + process::Future<bool> _____launch( const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, const Docker::Container& container); void _destroy( @@ -357,7 +345,9 @@ private: Future<Nothing> run; // We keep track of the resources for each container so we can set - // the ResourceStatistics limits in usage(). + // the ResourceStatistics limits in usage(). Note that this is + // different than just what we might get from TaskInfo::resources + // or ExecutorInfo::resources because they can change dynamically. Resources resources; // The mesos-fetcher subprocess, kept around so that we can do a @@ -558,6 +548,32 @@ Future<Nothing> DockerContainerizerProcess::_pull(const string& image) } +Try<Nothing> DockerContainerizerProcess::checkpoint( + const ContainerID& containerId, + pid_t pid) +{ + CHECK(containers_.contains(containerId)); + + Container* container = containers_[containerId]; + + if (container->checkpoint) { + const string& path = + slave::paths::getForkedPidPath( + slave::paths::getMetaRootDir(flags.work_dir), + container->slaveId, + container->executor.framework_id(), + container->executor.executor_id(), + containerId); + + LOG(INFO) << "Checkpointing pid " << pid << " to '" << path << "'"; + + return slave::state::checkpoint(path, stringify(pid)); + } + + return Nothing(); +} + + Future<Nothing> DockerContainerizer::recover( const Option<SlaveState>& state) { @@ -852,15 +868,7 @@ Future<bool> DockerContainerizerProcess::launch( return fetch(containerId, taskInfo.command(), directory) .then(defer(self(), &Self::_launch, containerId, directory)) .then(defer(self(), &Self::__launch, containerId, directory)) - .then(defer(self(), - &Self::___launch, - containerId, - taskInfo, - executorInfo, - directory, - slaveId, - slavePid, - checkpoint)) + .then(defer(self(), &Self::___launch, containerId)) .onFailed(defer(self(), &Self::destroy, containerId, true)); } @@ -916,31 +924,27 @@ Future<Nothing> DockerContainerizerProcess::__launch( Future<bool> DockerContainerizerProcess::___launch( - const ContainerID& containerId, - const TaskInfo& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint) + const ContainerID& containerId) { // After we do Docker::run we shouldn't remove a container until // after we set 'status', which we do in this function. CHECK(containers_.contains(containerId)); + Container* container = containers_[containerId]; + // Prepare environment variables for the executor. - map<string, string> env = executorEnvironment( - executorInfo, - directory, - slaveId, - slavePid, - checkpoint, + map<string, string> environment = executorEnvironment( + container->executor, + container->directory, + container->slaveId, + container->slavePid, + container->checkpoint, flags.recovery_timeout); // Include any enviroment variables from ExecutorInfo. foreach (const Environment::Variable& variable, - executorInfo.command().environment().variables()) { - env[variable.name()] = variable.value(); + container->executor.command().environment().variables()) { + environment[variable.name()] = variable.value(); } // Construct the mesos-executor "override" to do a 'docker wait' @@ -949,46 +953,30 @@ Future<bool> DockerContainerizerProcess::___launch( // don't want the exit status from 'docker wait' but rather the exit // status from the container, hence the use of /bin/bash. string override = - "/bin/sh -c 'exit `" + - flags.docker + " wait " + containers_[containerId]->name() + "`'"; + "/bin/sh -c 'exit `" + flags.docker + " wait " + container->name() + "`'"; Try<Subprocess> s = subprocess( - executorInfo.command().value() + " --override " + override, + container->executor.command().value() + " --override " + override, Subprocess::PIPE(), - Subprocess::PATH(path::join(directory, "stdout")), - Subprocess::PATH(path::join(directory, "stderr")), - env, - lambda::bind(&setup, directory)); + Subprocess::PATH(path::join(container->directory, "stdout")), + Subprocess::PATH(path::join(container->directory, "stderr")), + environment, + lambda::bind(&setup, container->directory)); if (s.isError()) { return Failure("Failed to fork executor: " + s.error()); } - // Checkpoint the executor's pid if requested. - if (checkpoint) { - const string& path = slave::paths::getForkedPidPath( - slave::paths::getMetaRootDir(flags.work_dir), - slaveId, - executorInfo.framework_id(), - executorInfo.executor_id(), - containerId); - - LOG(INFO) << "Checkpointing executor's forked pid " - << s.get().pid() << " to '" << path << "'"; + // Checkpoint the executor's pid (if necessary). + Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid()); - Try<Nothing> checkpointed = - slave::state::checkpoint(path, stringify(s.get().pid())); - - if (checkpointed.isError()) { - LOG(ERROR) << "Failed to checkpoint executor's forked pid to '" - << path << "': " << checkpointed.error(); - - // Close the subprocess's stdin so that it aborts. - CHECK_SOME(s.get().in()); - os::close(s.get().in().get()); + if (checkpointed.isError()) { + // Close the subprocess's stdin so that it aborts. + CHECK_SOME(s.get().in()); + os::close(s.get().in().get()); - return Failure("Could not checkpoint executor's pid"); - } + return Failure( + "Failed to checkpoint executor's pid: " + checkpointed.error()); } // Checkpoing complete, now synchronize with the process so that it @@ -1006,16 +994,17 @@ Future<bool> DockerContainerizerProcess::___launch( } // Store the resources for usage(). - containers_[containerId]->resources = taskInfo.resources(); + CHECK_SOME(container->task); + container->resources = container->task.get().resources(); // And finally watch for when the executor gets reaped. - containers_[containerId]->status.set(process::reap(s.get().pid())); + container->status.set(process::reap(s.get().pid())); - containers_[containerId]->status.future().get() + container->status.future().get() .onAny(defer(self(), &Self::reaped, containerId)); // TODO(benh): Check failure of Docker::logs. - docker.logs(containers_[containerId]->name(), directory); + docker.logs(container->name(), container->directory); return true; } @@ -1070,48 +1059,24 @@ Future<bool> DockerContainerizerProcess::launch( return fetch(containerId, executorInfo.command(), directory) .then(defer(self(), &Self::_launch, containerId, directory)) .then(defer(self(), &Self::__launch, containerId, directory)) - .then(defer(self(), - &Self::___launch, - containerId, - executorInfo, - directory, - slaveId, - slavePid, - checkpoint)) + .then(defer(self(), &Self::____launch, containerId)) .onFailed(defer(self(), &Self::destroy, containerId, true)); } -Future<bool> DockerContainerizerProcess::___launch( - const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint) +Future<bool> DockerContainerizerProcess::____launch( + const ContainerID& containerId) { // We shouldn't remove container until we set 'status'. CHECK(containers_.contains(containerId)); + return docker.inspect(containers_[containerId]->name()) - .then(defer(self(), - &Self::____launch, - containerId, - executorInfo, - directory, - slaveId, - slavePid, - checkpoint, - lambda::_1)); + .then(defer(self(), &Self::_____launch, containerId, lambda::_1)); } -Future<bool> DockerContainerizerProcess::____launch( +Future<bool> DockerContainerizerProcess::_____launch( const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, const Docker::Container& container) { // After we do Docker::run we shouldn't remove a container until @@ -1124,34 +1089,21 @@ Future<bool> DockerContainerizerProcess::____launch( return Failure("Unable to get executor pid after launch"); } - if (checkpoint) { - // TODO(tnachen): We might not be able to checkpoint if the slave - // dies before it can checkpoint while the executor is still - // running. Optinally we can consider recording the slave id and - // executor id as part of the docker container name so we can - // recover from this. - const string& path = - slave::paths::getForkedPidPath( - slave::paths::getMetaRootDir(flags.work_dir), - slaveId, - executorInfo.framework_id(), - executorInfo.executor_id(), - containerId); + // TODO(tnachen): We might not be able to checkpoint if the slave + // dies before it can checkpoint while the executor is still + // running. Optinally we can consider recording the slave id and + // executor id as part of the docker container name so we can + // recover from this. - LOG(INFO) << "Checkpointing executor's forked pid " - << pid.get() << " to '" << path << "'"; + Try<Nothing> checkpointed = checkpoint(containerId, pid.get()); - Try<Nothing> checkpointed = - slave::state::checkpoint(path, stringify(pid.get())); - - if (checkpointed.isError()) { - return Failure("Failed to checkpoint executor's forked pid to '" - + path + "': " + checkpointed.error()); - } + if (checkpointed.isError()) { + return Failure( + "Failed to checkpoint executor's pid: " + checkpointed.error()); } // Store the resources for usage(). - containers_[containerId]->resources = executorInfo.resources(); + containers_[containerId]->resources = containers_[containerId]->executor.resources(); // And finally watch for when the container gets reaped. containers_[containerId]->status.set(process::reap(pid.get())); @@ -1160,7 +1112,7 @@ Future<bool> DockerContainerizerProcess::____launch( .onAny(defer(self(), &Self::reaped, containerId)); // TODO(benh): Check failure of Docker::logs. - docker.logs(containers_[containerId]->name(), directory); + docker.logs(containers_[containerId]->name(), containers_[containerId]->directory); return true; }
