Add executor for docker containerizer

Review: https://reviews.apache.org/r/29329


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24415b72
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24415b72
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24415b72

Branch: refs/heads/master
Commit: 24415b72ef7eecfe3dd9022e5d58b6087aa10a6a
Parents: 8f31db8
Author: Timothy Chen <[email protected]>
Authored: Tue Nov 25 18:25:00 2014 -0800
Committer: Timothy Chen <[email protected]>
Committed: Fri May 22 23:13:50 2015 -0700

----------------------------------------------------------------------
 src/Makefile.am         |   5 +
 src/docker/executor.cpp | 358 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 363 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/24415b72/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 34755cf..1e56ae5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -822,6 +822,11 @@ mesos_usage_SOURCES = usage/main.cpp
 mesos_usage_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_usage_LDADD = libmesos.la $(LDADD)
 
+pkglibexec_PROGRAMS += mesos-docker-executor
+mesos_docker_executor_SOURCES = docker/executor.cpp
+mesos_docker_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_docker_executor_LDADD = libmesos.la
+
 bin_PROGRAMS += mesos-log
 mesos_log_SOURCES = log/main.cpp
 mesos_log_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/24415b72/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
new file mode 100644
index 0000000..1a5ab86
--- /dev/null
+++ b/src/docker/executor.cpp
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/executor.hpp>
+
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/reap.hpp>
+
+#include <stout/flags.hpp>
+#include <stout/os.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "logging/logging.hpp"
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::string;
+
+namespace mesos {
+namespace internal {
+
+using namespace mesos;
+using namespace process;
+
+
+// Executor that is responsible to execute a docker container, and
+// redirect log output to configured stdout and stderr files.
+// Similar to the CommandExecutor, it is only responsible to launch
+// one container and exits afterwards.
+// The executor also assumes it is launched from the
+// DockerContainerizer, which already calls setsid before launching
+// the executor.
+class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
+{
+public:
+  DockerExecutorProcess(const string& docker, const string& container)
+    : launched(false),
+      docker(docker),
+      container(container),
+      pid(-1) {}
+
+  virtual ~DockerExecutorProcess() {}
+
+  void registered(
+      ExecutorDriver* _driver,
+      const ExecutorInfo& executorInfo,
+      const FrameworkInfo& frameworkInfo,
+      const SlaveInfo& slaveInfo)
+  {
+    cout << "Registered docker executor on " << slaveInfo.hostname() << endl;
+    driver = _driver;
+  }
+
+  void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    cout << "Re-registered docker executor on " << slaveInfo.hostname() << 
endl;
+  }
+
+  void disconnected(ExecutorDriver* driver) {}
+
+  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    if (launched) {
+      TaskStatus status;
+      status.mutable_task_id()->MergeFrom(task.task_id());
+      status.set_state(TASK_FAILED);
+      status.set_message(
+          "Attempted to run multiple tasks using a \"docker\" executor");
+
+      driver->sendStatusUpdate(status);
+      return;
+    }
+
+    cout << "Starting task " << task.task_id().value() << endl;
+
+    Try<Subprocess> subprocess = process::subprocess(
+        "exit `" + docker + " wait + " + container + "`",
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDOUT_FILENO),
+        Subprocess::FD(STDERR_FILENO));
+
+    if (subprocess.isError()) {
+      cerr << "Couldn't launch docker wait process: " << subprocess.error();
+      abort();
+    }
+
+    pid = subprocess.get().pid();
+
+    process::reap(pid)
+      .onAny(defer(self(),
+                   &Self::reaped,
+                   driver,
+                   task.task_id(),
+                   pid,
+                   lambda::_1));
+
+    TaskStatus status;
+    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.set_state(TASK_RUNNING);
+    driver->sendStatusUpdate(status);
+
+    launched = true;
+  }
+
+  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    shutdown(driver);
+  }
+
+  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+
+  void shutdown(ExecutorDriver* driver)
+  {
+    cout << "Shutting down" << endl;
+
+    if (pid > 0 && !killed) {
+      ::kill(pid, SIGKILL);
+      killed = true;
+    }
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& message) {}
+
+private:
+  void reaped(
+      ExecutorDriver* driver,
+      const TaskID& taskId,
+      pid_t pid,
+      const Future<Option<int>>& status)
+  {
+    TaskState state;
+    string message;
+    if (!status.isReady()) {
+      state = TASK_FAILED;
+      message =
+        "Failed to get exit status for Docker executor: " +
+        (status.isFailed() ? status.failure() : "future discarded");
+    } else if (status.get().isNone()) {
+      state = TASK_FAILED;
+      message = "Failed to get exit status for Docker executor";
+    } else {
+      int s = status.get().get();
+
+      // Subprocess status is gathered from waitpid, therefore we can
+      // get the exit status from WIFEXITED.
+      CHECK(WIFEXITED(s) || WIFSIGNALED(s)) << "status code: " << s;
+
+      if (WIFEXITED(s) && WEXITSTATUS(s) == 0) {
+        state = TASK_FINISHED;
+      } else if (killed) {
+        // Send TASK_KILLED if the task was killed as a result of
+        // killTask() or shutdown().
+        state = TASK_KILLED;
+      } else {
+        state = TASK_FAILED;
+      }
+
+      message = "Docker  " + WSTRINGIFY(s);
+    }
+
+    cout << message << " (pid: " << pid << ")" << endl;
+
+    TaskStatus taskStatus;
+    taskStatus.mutable_task_id()->MergeFrom(taskId);
+    taskStatus.set_state(state);
+    taskStatus.set_message(message);
+
+    driver->sendStatusUpdate(taskStatus);
+
+    // A hack for now ... but we need to wait until the status update
+    // is sent to the slave before we shut ourselves down.
+    os::sleep(Seconds(1));
+    driver->stop();
+  }
+
+
+  bool launched;
+  string docker;
+  string container;
+  pid_t pid;
+  bool killed;
+  Option<ExecutorDriver*> driver;
+};
+
+
+class DockerExecutor : public Executor
+{
+public:
+  DockerExecutor(const string& docker, const string& container)
+  {
+    process = new DockerExecutorProcess(docker, container);
+    spawn(process);
+  }
+
+  virtual ~DockerExecutor()
+  {
+    terminate(process);
+    wait(process);
+    delete process;
+  }
+
+  virtual void registered(
+      ExecutorDriver* driver,
+      const ExecutorInfo& executorInfo,
+      const FrameworkInfo& frameworkInfo,
+      const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &DockerExecutorProcess::registered,
+             driver,
+             executorInfo,
+             frameworkInfo,
+             slaveInfo);
+  }
+
+  virtual void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &DockerExecutorProcess::reregistered,
+             driver,
+             slaveInfo);
+  }
+
+  virtual void disconnected(ExecutorDriver* driver)
+  {
+    dispatch(process, &DockerExecutorProcess::disconnected, driver);
+  }
+
+  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    dispatch(process, &DockerExecutorProcess::launchTask, driver, task);
+  }
+
+  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    dispatch(process, &DockerExecutorProcess::killTask, driver, taskId);
+  }
+
+  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &DockerExecutorProcess::frameworkMessage, driver, data);
+  }
+
+  virtual void shutdown(ExecutorDriver* driver)
+  {
+    dispatch(process, &DockerExecutorProcess::shutdown, driver);
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &DockerExecutorProcess::error, driver, data);
+  }
+
+private:
+  DockerExecutorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+
+void usage(const char* argv0, const flags::FlagsBase& flags)
+{
+  cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl
+       << endl
+       << "Supported options:" << endl
+       << flags.usage();
+}
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    add(&Flags::container,
+        "container",
+        "The name of the docker container to wait on.");
+
+    add(&Flags::docker,
+        "docker",
+        "The path to the docker cli executable.");
+  }
+
+  Option<string> container;
+  Option<string> docker;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  bool help;
+  flags.add(&help,
+            "help",
+            "Prints this help message",
+            false);
+
+  // Load flags from environment and command line.
+  Try<Nothing> load = flags.load(None(), &argc, &argv);
+
+  if (load.isError()) {
+    cerr << load.error() << endl;
+    usage(argv[0], flags);
+    return -1;
+  }
+
+  if (help) {
+    usage(argv[0], flags);
+    return -1;
+  }
+
+  if (flags.docker.isNone()) {
+    LOG(WARNING) << "Expected docker executable path";
+    usage(argv[0], flags);
+    return 0;
+  }
+
+  if (flags.container.isNone()) {
+    LOG(WARNING) << "Expected container name";
+    usage(argv[0], flags);
+    return 0;
+  }
+
+  mesos::internal::DockerExecutor executor(
+      flags.docker.get(), flags.container.get());
+  mesos::MesosExecutorDriver driver(&executor);
+  return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;
+}

Reply via email to