Added composing containerizer and --containerizers flag.

Review: https://reviews.apache.org/r/23462


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29910a6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29910a6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29910a6e

Branch: refs/heads/master
Commit: 29910a6e27b8e9374e323c05ffc63659b0180388
Parents: 1ae6ade
Author: Benjamin Hindman <[email protected]>
Authored: Sun Jun 22 17:54:53 2014 -0700
Committer: Benjamin Hindman <[email protected]>
Committed: Mon Aug 4 09:15:50 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                           |   2 +
 src/slave/containerizer/composing.cpp     | 545 +++++++++++++++++++++++++
 src/slave/containerizer/composing.hpp     | 100 +++++
 src/slave/containerizer/containerizer.cpp |  39 +-
 src/slave/flags.hpp                       |  11 +
 5 files changed, 693 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index c7ed168..0d9e3f0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -261,6 +261,8 @@ libmesos_no_3rdparty_la_SOURCES =                           
        \
        slave/state.cpp                                                 \
        slave/slave.cpp                                                 \
        slave/containerizer/containerizer.cpp                           \
+       slave/containerizer/composing.cpp                               \
+       slave/containerizer/composing.hpp                               \
        slave/containerizer/external_containerizer.cpp                  \
        slave/containerizer/isolator.cpp                                \
        slave/containerizer/launcher.cpp                                \

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/composing.cpp 
b/src/slave/containerizer/composing.cpp
new file mode 100644
index 0000000..9b36d91
--- /dev/null
+++ b/src/slave/containerizer/composing.cpp
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+#include <vector>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+
+#include "slave/state.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/composing.hpp"
+
+using std::list;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+
+class ComposingContainerizerProcess
+  : public Process<ComposingContainerizerProcess>
+{
+public:
+  ComposingContainerizerProcess(
+      const vector<Containerizer*>& containerizers);
+
+  virtual ~ComposingContainerizerProcess();
+
+  Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  Future<bool> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  Future<bool> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  void destroy(const ContainerID& containerId);
+
+  Future<hashset<ContainerID> > containers();
+
+private:
+  // Continuations.
+  Future<Nothing> _recover();
+  Future<Nothing> __recover(
+      Containerizer* containerizer,
+      const hashset<ContainerID>& containers);
+  static Future<Nothing> ___recover();
+
+  Future<bool> _launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint,
+      vector<Containerizer*>::iterator containerizer,
+      bool launched);
+
+  Future<bool> _launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint,
+      vector<Containerizer*>::iterator containerizer,
+      bool launched);
+
+  vector<Containerizer*> containerizers_;
+  hashmap<Containerizer*, hashset<ContainerID> > containers_;
+};
+
+
+Try<ComposingContainerizer*> ComposingContainerizer::create(
+    const vector<Containerizer*>& containerizers)
+{
+  return new ComposingContainerizer(containerizers);
+}
+
+
+ComposingContainerizer::ComposingContainerizer(
+    const vector<Containerizer*>& containerizers)
+{
+  process = new ComposingContainerizerProcess(containerizers);
+  spawn(process);
+}
+
+
+ComposingContainerizer::~ComposingContainerizer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Nothing> ComposingContainerizer::recover(
+    const Option<state::SlaveState>& state)
+{
+  return dispatch(process, &ComposingContainerizerProcess::recover, state);
+}
+
+
+Future<bool> ComposingContainerizer::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::launch,
+                  containerId,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<bool> ComposingContainerizer::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::launch,
+                  containerId,
+                  taskInfo,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> ComposingContainerizer::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::update,
+                  containerId,
+                  resources);
+}
+
+
+Future<ResourceStatistics> ComposingContainerizer::usage(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ComposingContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> ComposingContainerizer::wait(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ComposingContainerizerProcess::wait, containerId);
+}
+
+
+void ComposingContainerizer::destroy(const ContainerID& containerId)
+{
+  dispatch(process, &ComposingContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > ComposingContainerizer::containers()
+{
+  return dispatch(process, &ComposingContainerizerProcess::containers);
+}
+
+
+ComposingContainerizerProcess::ComposingContainerizerProcess(
+    const vector<Containerizer*>& containerizers)
+  : containerizers_(containerizers)
+{
+  foreach (Containerizer* containerizer, containerizers_) {
+    containers_[containerizer] = hashset<ContainerID>();
+  }
+}
+
+
+ComposingContainerizerProcess::~ComposingContainerizerProcess()
+{
+  foreach (Containerizer* containerizer, containerizers_) {
+    delete containerizer;
+  }
+  containerizers_.clear();
+  containers_.clear();
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::recover(
+    const Option<state::SlaveState>& state)
+{
+  // Recover each containerizer in parallel.
+  list<Future<Nothing> > futures;
+  foreach (Containerizer* containerizer, containerizers_) {
+    futures.push_back(containerizer->recover(state));
+  }
+
+  return collect(futures)
+    .then(defer(self(), &Self::_recover));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::_recover()
+{
+  // Now collect all the running containers in order to multiplex.
+  list<Future<Nothing> > futures;
+  foreach (Containerizer* containerizer, containerizers_) {
+    Future<Nothing> future = containerizer->containers()
+      .then(defer(self(), &Self::__recover, containerizer, lambda::_1));
+    futures.push_back(future);
+  }
+
+  return collect(futures)
+    .then(lambda::bind(&Self::___recover));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::__recover(
+    Containerizer* containerizer,
+    const hashset<ContainerID>& containers)
+{
+  containers_[containerizer] = containers;
+  return Nothing();
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::___recover()
+{
+  return Nothing();
+}
+
+
+Future<bool> ComposingContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  // Try each containerizer. If none of them handle the
+  // TaskInfo/ExecutorInfo then return a Failure.
+  vector<Containerizer*>::iterator containerizer = containerizers_.begin();
+
+  return (*containerizer)->launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<bool> ComposingContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    vector<Containerizer*>::iterator containerizer,
+    bool launched)
+{
+  if (launched) {
+    containers_[*containerizer].insert(containerId);
+    return true;
+  }
+
+  // Try the next containerizer.
+  ++containerizer;
+
+  if (containerizer == containerizers_.end()) {
+    return false;
+  }
+
+  return (*containerizer)->launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<bool> ComposingContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  // Try each containerizer. If none of them handle the
+  // TaskInfo/ExecutorInfo then return a Failure.
+  vector<Containerizer*>::iterator containerizer = containerizers_.begin();
+
+  return (*containerizer)->launch(
+      containerId,
+      taskInfo,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                taskInfo,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+Future<bool> ComposingContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    vector<Containerizer*>::iterator containerizer,
+    bool launched)
+{
+  if (launched) {
+    containers_[*containerizer].insert(containerId);
+    return true;
+  }
+
+  // Try the next containerizer.
+  ++containerizer;
+
+  if (containerizer == containerizers_.end()) {
+    return false;
+  }
+
+  return (*containerizer)->launch(
+      containerId,
+      taskInfo,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                taskInfo,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->update(containerId, resources);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+Future<ResourceStatistics> ComposingContainerizerProcess::usage(
+    const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->usage(containerId);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+Future<containerizer::Termination> ComposingContainerizerProcess::wait(
+    const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->wait(containerId);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+void ComposingContainerizerProcess::destroy(const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      containerizer->destroy(containerId);
+      break;
+    }
+  }
+}
+
+
+// TODO(benh): Move into stout/hashset.hpp
+template <typename Elem>
+hashset<Elem> merge(const std::list<hashset<Elem> >& list)
+{
+  hashset<Elem> result;
+  foreach (const hashset<Elem>& set, list) {
+    result.insert(set.begin(), set.end());
+  }
+  return result;
+}
+
+
+Future<hashset<ContainerID> > ComposingContainerizerProcess::containers()
+{
+  return merge(containers_.values());
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/composing.hpp 
b/src/slave/containerizer/composing.hpp
new file mode 100644
index 0000000..f1e60b0
--- /dev/null
+++ b/src/slave/containerizer/composing.hpp
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMPOSING_CONTAINERIZER_HPP__
+#define __COMPOSING_CONTAINERIZER_HPP__
+
+#include <map>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <mesos/containerizer/containerizer.hpp>
+
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class ComposingContainerizerProcess;
+
+class ComposingContainerizer : public Containerizer
+{
+public:
+  static Try<ComposingContainerizer*> create(
+      const std::vector<Containerizer*>& containerizers);
+
+  ComposingContainerizer(
+      const std::vector<Containerizer*>& containerizers);
+
+  virtual ~ComposingContainerizer();
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(const ContainerID& containerId);
+
+  virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+  ComposingContainerizerProcess* process;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __COMPOSING_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp 
b/src/slave/containerizer/containerizer.cpp
index 1b71f33..d62d25d 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -34,6 +34,7 @@
 #ifdef __linux__
 #include "slave/containerizer/linux_launcher.hpp"
 #endif // __linux__
+#include "slave/containerizer/composing.hpp"
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
@@ -153,12 +154,42 @@ Try<Resources> Containerizer::resources(const Flags& 
flags)
 
 Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
 {
-  if (flags.isolation == "external") {
-    return new ExternalContainerizer(flags);
+  // TODO(benh): We need to store which containerizer or
+  // containerizers were being used. See MESOS-1663.
+
+  // Create containerizer(s).
+  vector<Containerizer*> containerizers;
+
+  foreach (const string& type, strings::split(flags.containerizers, ",")) {
+    if (type == "mesos") {
+      Try<MesosContainerizer*> containerizer =
+        MesosContainerizer::create(flags, local);
+      if (containerizer.isError()) {
+        return Error("Could not create MesosContainerizer: " +
+                     containerizer.error());
+      } else {
+        containerizers.push_back(containerizer.get());
+      }
+    } else if (type == "external") {
+      Try<Containerizer*> containerizer =
+        ExternalContainerizer::create(flags, local);
+      if (containerizer.isError()) {
+        return Error("Could not create ExternalContainerizer: " +
+                     containerizer.error());
+      } else {
+        containerizers.push_back(containerizer.get());
+      }
+    } else {
+      return Error("Unknown or unsupported containerizer: " + type);
+    }
+  }
+
+  if (containerizers.size() == 1) {
+    return containerizers.front();
   }
 
-  Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, local);
+  Try<ComposingContainerizer*> containerizer =
+    ComposingContainerizer::create(containerizers);
 
   if (containerizer.isError()) {
     return Error(containerizer.error());

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 146c401..5ea2f38 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -262,6 +262,16 @@ public:
         "The path to the external containerizer executable used when\n"
         "external isolation is activated (--isolation=external).\n");
 
+    add(&Flags::containerizers,
+        "containerizers",
+        "Comma separated list of containerizer implementations\n"
+        "to compose in order to provide containerization.\n"
+        "Available options are 'mesos', 'external', and\n"
+        "'docker' (on Linux). The order the containerizers\n"
+        "are specified is the order they are tried\n"
+        "(--containerizers=mesos).\n",
+        "mesos");
+
     add(&Flags::default_container_image,
         "default_container_image",
         "The default container image to use if not specified by a task,\n"
@@ -330,6 +340,7 @@ public:
 #endif
   Option<std::string> credential;
   Option<std::string> containerizer_path;
+  std::string containerizers;
   Option<std::string> default_container_image;
 #ifdef WITH_NETWORK_ISOLATOR
   uint16_t ephemeral_ports_per_container;

Reply via email to