Repository: mesos
Updated Branches:
  refs/heads/master 76873d345 -> b16999a4c


Send docker inspect output with TaskStatus data.

Review: https://reviews.apache.org/r/34654


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b16999a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b16999a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b16999a4

Branch: refs/heads/master
Commit: b16999a4cb92ad35e97d29e7e99d897063f8bdca
Parents: 76873d3
Author: Timothy Chen <[email protected]>
Authored: Sat May 23 22:47:49 2015 -0700
Committer: Timothy Chen <[email protected]>
Committed: Sun May 31 22:17:28 2015 -0700

----------------------------------------------------------------------
 src/docker/docker.cpp                    |  58 ++++++------
 src/docker/docker.hpp                    |   9 +-
 src/docker/executor.cpp                  | 124 +++++++++++++++-----------
 src/tests/docker_containerizer_tests.cpp |   4 +
 4 files changed, 111 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index ee74da5..7138329 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -204,8 +204,24 @@ Future<Version> Docker::__version(const Future<string>& 
output)
 }
 
 
-Try<Docker::Container> Docker::Container::create(const JSON::Object& json)
+Try<Docker::Container> Docker::Container::create(const string& output)
 {
+  Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
+  if (parse.isError()) {
+    return Error("Failed to parse JSON: " + parse.error());
+  }
+
+  // TODO(benh): Handle the case where the short container ID was
+  // not sufficiently unique and 'array.values.size() > 1'.
+  JSON::Array array = parse.get();
+  if (array.values.size() != 1) {
+    return Error("Failed to find container");
+  }
+
+  CHECK(array.values.front().is<JSON::Object>());
+
+  JSON::Object json = array.values.front().as<JSON::Object>();
+
   Result<JSON::String> idValue = json.find<JSON::String>("Id");
   if (idValue.isNone()) {
     return Error("Unable to find Id in container");
@@ -255,7 +271,7 @@ Try<Docker::Container> Docker::Container::create(const 
JSON::Object& json)
 
   bool started = startedAtValue.get().value != "0001-01-01T00:00:00Z";
 
-  return Docker::Container(id, name, optionalPid, started);
+  return Docker::Container(output, id, name, optionalPid, started);
 }
 
 
@@ -725,41 +741,23 @@ void Docker::___inspect(
     return;
   }
 
-  Try<JSON::Array> parse = JSON::parse<JSON::Array>(output.get());
+  Try<Docker::Container> container = Docker::Container::create(
+      output.get());
 
-  if (parse.isError()) {
-    promise->fail("Failed to parse JSON: " + parse.error());
+  if (container.isError()) {
+    promise->fail("Unable to create container: " + container.error());
     return;
   }
 
-  JSON::Array array = parse.get();
-  // Only return if only one container identified with name.
-  if (array.values.size() == 1) {
-    CHECK(array.values.front().is<JSON::Object>());
-    Try<Docker::Container> container =
-      Docker::Container::create(array.values.front().as<JSON::Object>());
-
-    if (container.isError()) {
-      promise->fail("Unable to create container: " + container.error());
-      return;
-    }
-
-    if (retryInterval.isSome() && !container.get().started) {
-      VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
-              << cmd << "', interval: " << stringify(retryInterval.get());
-      Clock::timer(retryInterval.get(),
-                   [=]() { _inspect(cmd, promise, retryInterval); } );
-      return;
-    }
-
-    promise->set(container.get());
+  if (retryInterval.isSome() && !container.get().started) {
+    VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
+            << cmd << "', interval: " << stringify(retryInterval.get());
+    Clock::timer(retryInterval.get(),
+                 [=]() { _inspect(cmd, promise, retryInterval); } );
     return;
   }
 
-  // TODO(benh): Handle the case where the short container ID was
-  // not sufficiently unique and 'array.values.size() > 1'.
-
-  promise->fail("Failed to find container");
+  promise->set(container.get());
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index d06c73a..7790d0f 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -49,7 +49,11 @@ public:
   class Container
   {
   public:
-    static Try<Container> create(const JSON::Object& json);
+    static Try<Container> create(
+        const std::string& output);
+
+    // Returns the docker inspect output.
+    const std::string output;
 
     // Returns the ID of the container.
     const std::string id;
@@ -67,11 +71,12 @@ public:
 
   private:
     Container(
+        const std::string& output,
         const std::string& id,
         const std::string& name,
         const Option<pid_t>& pid,
         bool started)
-      : id(id), name(name), pid(pid), started(started) {}
+      : output(output), id(id), name(name), pid(pid), started(started) {}
   };
 
   class Image

http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 075c6b5..709fbe3 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -52,6 +52,8 @@ namespace docker {
 using namespace mesos;
 using namespace process;
 
+const Duration DOCKER_INSPECT_DELAY = Milliseconds(500);
+const Duration DOCKER_INSPECT_TIMEOUT = Seconds(5);
 
 // Executor that is responsible to execute a docker container, and
 // redirect log output to configured stdout and stderr files.
@@ -65,17 +67,18 @@ class DockerExecutorProcess : public 
ProtobufProcess<DockerExecutorProcess>
 public:
   DockerExecutorProcess(
       const Owned<Docker>& docker,
-      const string& container,
+      const string& containerName,
       const string& sandboxDirectory,
       const string& mappedDirectory,
       const Duration& stopTimeout)
     : killed(false),
       docker(docker),
-      container(container),
+      containerName(containerName),
       sandboxDirectory(sandboxDirectory),
       mappedDirectory(mappedDirectory),
       stopTimeout(stopTimeout),
-      stop(Nothing()) {}
+      stop(Nothing()),
+      inspect(Nothing()) {}
 
   virtual ~DockerExecutorProcess() {}
 
@@ -114,7 +117,9 @@ public:
       return;
     }
 
-    cout << "Starting task " << task.task_id().value() << endl;
+    TaskID taskId = task.task_id();
+
+    cout << "Starting task " << taskId.value() << endl;
 
     CHECK(task.has_container());
     CHECK(task.has_command());
@@ -130,7 +135,7 @@ public:
     run = docker->run(
         task.container(),
         task.command(),
-        container,
+        containerName,
         sandboxDirectory,
         mappedDirectory,
         task.resources() + task.executor().resources(),
@@ -141,13 +146,23 @@ public:
         self(),
         &Self::reaped,
         driver,
-        task.task_id(),
+        taskId,
         lambda::_1));
 
-    TaskStatus status;
-    status.mutable_task_id()->CopyFrom(task.task_id());
-    status.set_state(TASK_RUNNING);
-    driver->sendStatusUpdate(status);
+    // Delay sending TASK_RUNNING status update until we receive
+    // inspect output.
+    inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
+      .then(defer(self(), [=](const Docker::Container& container) {
+        if (!killed) {
+          TaskStatus status;
+          status.mutable_task_id()->CopyFrom(taskId);
+          status.set_state(TASK_RUNNING);
+          status.set_data(container.output);
+          driver->sendStatusUpdate(status);
+        }
+
+        return Nothing();
+      }));
   }
 
   void killTask(ExecutorDriver* driver, const TaskID& taskId)
@@ -169,7 +184,7 @@ public:
 
       // Making a mutable copy of the future so we can call discard.
       Future<Nothing>(run.get()).discard();
-      stop = docker->stop(container, stopTimeout);
+      stop = docker->stop(containerName, stopTimeout);
       killed = true;
     }
   }
@@ -178,59 +193,64 @@ public:
 
 private:
   void reaped(
-      ExecutorDriver* driver,
+      ExecutorDriver* _driver,
       const TaskID& taskId,
       const Future<Nothing>& run)
   {
-    stop.onAny(defer(self(), &Self::_reaped, driver, taskId, run, lambda::_1));
-  }
-
-  void _reaped(
-      ExecutorDriver* driver,
-      const TaskID& taskId,
-      const Future<Nothing>& run,
-      const Future<Nothing>& stop)
-  {
-    TaskState state;
-    string message;
-    if (!stop.isReady()) {
-      state = TASK_FAILED;
-      message = "Unable to stop docker container, error: " +
-                (stop.isFailed() ? stop.failure() : "future discarded");
-    } else if (killed) {
-      state = TASK_KILLED;
-    } else if (!run.isReady()) {
-      state = TASK_FAILED;
-      message = "Docker container run error: " +
-                (run.isFailed() ? run.failure() : "future discarded");
-    } else {
-      state = TASK_FINISHED;
-    }
-
-    TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->CopyFrom(taskId);
-    taskStatus.set_state(state);
-    taskStatus.set_message(message);
-
-    driver->sendStatusUpdate(taskStatus);
-
-    // A hack for now ... but we need to wait until the status update
-    // is sent to the slave before we shut ourselves down.
-    // TODO(tnachen): Remove this hack and also the same hack in the
-    // command executor when we have the new HTTP APIs to wait until
-    // an ack.
-    os::sleep(Seconds(1));
-    driver->stop();
+    // Wait for docker->stop to finish, and best effort wait for the
+    // inspect future to complete with a timeout.
+    stop.onAny(defer(self(), [=](const Future<Nothing>&) {
+      inspect
+        .after(DOCKER_INSPECT_TIMEOUT, [=](const Future<Nothing>&) {
+          inspect.discard();
+          return inspect;
+        })
+        .onAny(defer(self(), [=](const Future<Nothing>&) {
+          CHECK_SOME(driver);
+          TaskState state;
+          string message;
+          if (!stop.isReady()) {
+            state = TASK_FAILED;
+            message = "Unable to stop docker container, error: " +
+                      (stop.isFailed() ? stop.failure() : "future discarded");
+          } else if (killed) {
+            state = TASK_KILLED;
+          } else if (!run.isReady()) {
+            state = TASK_FAILED;
+            message = "Docker container run error: " +
+                      (run.isFailed() ?
+                       run.failure() : "future discarded");
+          } else {
+            state = TASK_FINISHED;
+          }
+
+          TaskStatus taskStatus;
+          taskStatus.mutable_task_id()->CopyFrom(taskId);
+          taskStatus.set_state(state);
+          taskStatus.set_message(message);
+
+          driver.get()->sendStatusUpdate(taskStatus);
+
+          // A hack for now ... but we need to wait until the status update
+          // is sent to the slave before we shut ourselves down.
+          // TODO(tnachen): Remove this hack and also the same hack in the
+          // command executor when we have the new HTTP APIs to wait until
+          // an ack.
+          os::sleep(Seconds(1));
+          driver.get()->stop();
+        }));
+    }));
   }
 
   bool killed;
   Owned<Docker> docker;
-  string container;
+  string containerName;
   string sandboxDirectory;
   string mappedDirectory;
   Duration stopTimeout;
   Option<Future<Nothing>> run;
   Future<Nothing> stop;
+  Future<Nothing> inspect;
   Option<ExecutorDriver*> driver;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b16999a4/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp 
b/src/tests/docker_containerizer_tests.cpp
index 7524803..8d3e605 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -713,6 +713,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
   AWAIT_READY_FOR(containerId, Seconds(60));
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  ASSERT_TRUE(statusRunning.get().has_data());
+
+  Try<JSON::Array> parse = 
JSON::parse<JSON::Array>(statusRunning.get().data());
+  ASSERT_SOME(parse);
 
   ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 

Reply via email to