Repository: mesos Updated Branches: refs/heads/master 76873d345 -> b16999a4c
Send docker inspect output with TaskStatus data. Review: https://reviews.apache.org/r/34654 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b16999a4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b16999a4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b16999a4 Branch: refs/heads/master Commit: b16999a4cb92ad35e97d29e7e99d897063f8bdca Parents: 76873d3 Author: Timothy Chen <[email protected]> Authored: Sat May 23 22:47:49 2015 -0700 Committer: Timothy Chen <[email protected]> Committed: Sun May 31 22:17:28 2015 -0700 ---------------------------------------------------------------------- src/docker/docker.cpp | 58 ++++++------ src/docker/docker.hpp | 9 +- src/docker/executor.cpp | 124 +++++++++++++++----------- src/tests/docker_containerizer_tests.cpp | 4 + 4 files changed, 111 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index ee74da5..7138329 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -204,8 +204,24 @@ Future<Version> Docker::__version(const Future<string>& output) } -Try<Docker::Container> Docker::Container::create(const JSON::Object& json) +Try<Docker::Container> Docker::Container::create(const string& output) { + Try<JSON::Array> parse = JSON::parse<JSON::Array>(output); + if (parse.isError()) { + return Error("Failed to parse JSON: " + parse.error()); + } + + // TODO(benh): Handle the case where the short container ID was + // not sufficiently unique and 'array.values.size() > 1'. + JSON::Array array = parse.get(); + if (array.values.size() != 1) { + return Error("Failed to find container"); + } + + CHECK(array.values.front().is<JSON::Object>()); + + JSON::Object json = array.values.front().as<JSON::Object>(); + Result<JSON::String> idValue = json.find<JSON::String>("Id"); if (idValue.isNone()) { return Error("Unable to find Id in container"); @@ -255,7 +271,7 @@ Try<Docker::Container> Docker::Container::create(const JSON::Object& json) bool started = startedAtValue.get().value != "0001-01-01T00:00:00Z"; - return Docker::Container(id, name, optionalPid, started); + return Docker::Container(output, id, name, optionalPid, started); } @@ -725,41 +741,23 @@ void Docker::___inspect( return; } - Try<JSON::Array> parse = JSON::parse<JSON::Array>(output.get()); + Try<Docker::Container> container = Docker::Container::create( + output.get()); - if (parse.isError()) { - promise->fail("Failed to parse JSON: " + parse.error()); + if (container.isError()) { + promise->fail("Unable to create container: " + container.error()); return; } - JSON::Array array = parse.get(); - // Only return if only one container identified with name. - if (array.values.size() == 1) { - CHECK(array.values.front().is<JSON::Object>()); - Try<Docker::Container> container = - Docker::Container::create(array.values.front().as<JSON::Object>()); - - if (container.isError()) { - promise->fail("Unable to create container: " + container.error()); - return; - } - - if (retryInterval.isSome() && !container.get().started) { - VLOG(1) << "Retrying inspect since container not yet started. cmd: '" - << cmd << "', interval: " << stringify(retryInterval.get()); - Clock::timer(retryInterval.get(), - [=]() { _inspect(cmd, promise, retryInterval); } ); - return; - } - - promise->set(container.get()); + if (retryInterval.isSome() && !container.get().started) { + VLOG(1) << "Retrying inspect since container not yet started. cmd: '" + << cmd << "', interval: " << stringify(retryInterval.get()); + Clock::timer(retryInterval.get(), + [=]() { _inspect(cmd, promise, retryInterval); } ); return; } - // TODO(benh): Handle the case where the short container ID was - // not sufficiently unique and 'array.values.size() > 1'. - - promise->fail("Failed to find container"); + promise->set(container.get()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index d06c73a..7790d0f 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -49,7 +49,11 @@ public: class Container { public: - static Try<Container> create(const JSON::Object& json); + static Try<Container> create( + const std::string& output); + + // Returns the docker inspect output. + const std::string output; // Returns the ID of the container. const std::string id; @@ -67,11 +71,12 @@ public: private: Container( + const std::string& output, const std::string& id, const std::string& name, const Option<pid_t>& pid, bool started) - : id(id), name(name), pid(pid), started(started) {} + : output(output), id(id), name(name), pid(pid), started(started) {} }; class Image http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/executor.cpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index 075c6b5..709fbe3 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -52,6 +52,8 @@ namespace docker { using namespace mesos; using namespace process; +const Duration DOCKER_INSPECT_DELAY = Milliseconds(500); +const Duration DOCKER_INSPECT_TIMEOUT = Seconds(5); // Executor that is responsible to execute a docker container, and // redirect log output to configured stdout and stderr files. @@ -65,17 +67,18 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess> public: DockerExecutorProcess( const Owned<Docker>& docker, - const string& container, + const string& containerName, const string& sandboxDirectory, const string& mappedDirectory, const Duration& stopTimeout) : killed(false), docker(docker), - container(container), + containerName(containerName), sandboxDirectory(sandboxDirectory), mappedDirectory(mappedDirectory), stopTimeout(stopTimeout), - stop(Nothing()) {} + stop(Nothing()), + inspect(Nothing()) {} virtual ~DockerExecutorProcess() {} @@ -114,7 +117,9 @@ public: return; } - cout << "Starting task " << task.task_id().value() << endl; + TaskID taskId = task.task_id(); + + cout << "Starting task " << taskId.value() << endl; CHECK(task.has_container()); CHECK(task.has_command()); @@ -130,7 +135,7 @@ public: run = docker->run( task.container(), task.command(), - container, + containerName, sandboxDirectory, mappedDirectory, task.resources() + task.executor().resources(), @@ -141,13 +146,23 @@ public: self(), &Self::reaped, driver, - task.task_id(), + taskId, lambda::_1)); - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.set_state(TASK_RUNNING); - driver->sendStatusUpdate(status); + // Delay sending TASK_RUNNING status update until we receive + // inspect output. + inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY) + .then(defer(self(), [=](const Docker::Container& container) { + if (!killed) { + TaskStatus status; + status.mutable_task_id()->CopyFrom(taskId); + status.set_state(TASK_RUNNING); + status.set_data(container.output); + driver->sendStatusUpdate(status); + } + + return Nothing(); + })); } void killTask(ExecutorDriver* driver, const TaskID& taskId) @@ -169,7 +184,7 @@ public: // Making a mutable copy of the future so we can call discard. Future<Nothing>(run.get()).discard(); - stop = docker->stop(container, stopTimeout); + stop = docker->stop(containerName, stopTimeout); killed = true; } } @@ -178,59 +193,64 @@ public: private: void reaped( - ExecutorDriver* driver, + ExecutorDriver* _driver, const TaskID& taskId, const Future<Nothing>& run) { - stop.onAny(defer(self(), &Self::_reaped, driver, taskId, run, lambda::_1)); - } - - void _reaped( - ExecutorDriver* driver, - const TaskID& taskId, - const Future<Nothing>& run, - const Future<Nothing>& stop) - { - TaskState state; - string message; - if (!stop.isReady()) { - state = TASK_FAILED; - message = "Unable to stop docker container, error: " + - (stop.isFailed() ? stop.failure() : "future discarded"); - } else if (killed) { - state = TASK_KILLED; - } else if (!run.isReady()) { - state = TASK_FAILED; - message = "Docker container run error: " + - (run.isFailed() ? run.failure() : "future discarded"); - } else { - state = TASK_FINISHED; - } - - TaskStatus taskStatus; - taskStatus.mutable_task_id()->CopyFrom(taskId); - taskStatus.set_state(state); - taskStatus.set_message(message); - - driver->sendStatusUpdate(taskStatus); - - // A hack for now ... but we need to wait until the status update - // is sent to the slave before we shut ourselves down. - // TODO(tnachen): Remove this hack and also the same hack in the - // command executor when we have the new HTTP APIs to wait until - // an ack. - os::sleep(Seconds(1)); - driver->stop(); + // Wait for docker->stop to finish, and best effort wait for the + // inspect future to complete with a timeout. + stop.onAny(defer(self(), [=](const Future<Nothing>&) { + inspect + .after(DOCKER_INSPECT_TIMEOUT, [=](const Future<Nothing>&) { + inspect.discard(); + return inspect; + }) + .onAny(defer(self(), [=](const Future<Nothing>&) { + CHECK_SOME(driver); + TaskState state; + string message; + if (!stop.isReady()) { + state = TASK_FAILED; + message = "Unable to stop docker container, error: " + + (stop.isFailed() ? stop.failure() : "future discarded"); + } else if (killed) { + state = TASK_KILLED; + } else if (!run.isReady()) { + state = TASK_FAILED; + message = "Docker container run error: " + + (run.isFailed() ? + run.failure() : "future discarded"); + } else { + state = TASK_FINISHED; + } + + TaskStatus taskStatus; + taskStatus.mutable_task_id()->CopyFrom(taskId); + taskStatus.set_state(state); + taskStatus.set_message(message); + + driver.get()->sendStatusUpdate(taskStatus); + + // A hack for now ... but we need to wait until the status update + // is sent to the slave before we shut ourselves down. + // TODO(tnachen): Remove this hack and also the same hack in the + // command executor when we have the new HTTP APIs to wait until + // an ack. + os::sleep(Seconds(1)); + driver.get()->stop(); + })); + })); } bool killed; Owned<Docker> docker; - string container; + string containerName; string sandboxDirectory; string mappedDirectory; Duration stopTimeout; Option<Future<Nothing>> run; Future<Nothing> stop; + Future<Nothing> inspect; Option<ExecutorDriver*> driver; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index 7524803..8d3e605 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -713,6 +713,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) AWAIT_READY_FOR(containerId, Seconds(60)); AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + ASSERT_TRUE(statusRunning.get().has_data()); + + Try<JSON::Array> parse = JSON::parse<JSON::Array>(statusRunning.get().data()); + ASSERT_SOME(parse); ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
