Added composing containerizer and --containerizers flag. Review: https://reviews.apache.org/r/23462
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29910a6e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29910a6e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29910a6e Branch: refs/heads/master Commit: 29910a6e27b8e9374e323c05ffc63659b0180388 Parents: 1ae6ade Author: Benjamin Hindman <[email protected]> Authored: Sun Jun 22 17:54:53 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Mon Aug 4 09:15:50 2014 -0700 ---------------------------------------------------------------------- src/Makefile.am | 2 + src/slave/containerizer/composing.cpp | 545 +++++++++++++++++++++++++ src/slave/containerizer/composing.hpp | 100 +++++ src/slave/containerizer/containerizer.cpp | 39 +- src/slave/flags.hpp | 11 + 5 files changed, 693 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index c7ed168..0d9e3f0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -261,6 +261,8 @@ libmesos_no_3rdparty_la_SOURCES = \ slave/state.cpp \ slave/slave.cpp \ slave/containerizer/containerizer.cpp \ + slave/containerizer/composing.cpp \ + slave/containerizer/composing.hpp \ slave/containerizer/external_containerizer.cpp \ slave/containerizer/isolator.cpp \ slave/containerizer/launcher.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/composing.cpp b/src/slave/containerizer/composing.cpp new file mode 100644 index 0000000..9b36d91 --- /dev/null +++ b/src/slave/containerizer/composing.cpp @@ -0,0 +1,545 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <list> +#include <vector> + +#include <process/collect.hpp> +#include <process/defer.hpp> +#include <process/dispatch.hpp> + +#include <stout/hashmap.hpp> +#include <stout/hashset.hpp> +#include <stout/lambda.hpp> + +#include "slave/state.hpp" + +#include "slave/containerizer/containerizer.hpp" +#include "slave/containerizer/composing.hpp" + +using std::list; +using std::string; +using std::vector; + +using namespace process; + +namespace mesos { +namespace internal { +namespace slave { + + +class ComposingContainerizerProcess + : public Process<ComposingContainerizerProcess> +{ +public: + ComposingContainerizerProcess( + const vector<Containerizer*>& containerizers); + + virtual ~ComposingContainerizerProcess(); + + Future<Nothing> recover( + const Option<state::SlaveState>& state); + + Future<bool> launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint); + + Future<bool> launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint); + + Future<Nothing> update( + const ContainerID& containerId, + const Resources& resources); + + Future<ResourceStatistics> usage( + const ContainerID& containerId); + + Future<containerizer::Termination> wait( + const ContainerID& containerId); + + void destroy(const ContainerID& containerId); + + Future<hashset<ContainerID> > containers(); + +private: + // Continuations. + Future<Nothing> _recover(); + Future<Nothing> __recover( + Containerizer* containerizer, + const hashset<ContainerID>& containers); + static Future<Nothing> ___recover(); + + Future<bool> _launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + vector<Containerizer*>::iterator containerizer, + bool launched); + + Future<bool> _launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + vector<Containerizer*>::iterator containerizer, + bool launched); + + vector<Containerizer*> containerizers_; + hashmap<Containerizer*, hashset<ContainerID> > containers_; +}; + + +Try<ComposingContainerizer*> ComposingContainerizer::create( + const vector<Containerizer*>& containerizers) +{ + return new ComposingContainerizer(containerizers); +} + + +ComposingContainerizer::ComposingContainerizer( + const vector<Containerizer*>& containerizers) +{ + process = new ComposingContainerizerProcess(containerizers); + spawn(process); +} + + +ComposingContainerizer::~ComposingContainerizer() +{ + terminate(process); + process::wait(process); + delete process; +} + + +Future<Nothing> ComposingContainerizer::recover( + const Option<state::SlaveState>& state) +{ + return dispatch(process, &ComposingContainerizerProcess::recover, state); +} + + +Future<bool> ComposingContainerizer::launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint) +{ + return dispatch(process, + &ComposingContainerizerProcess::launch, + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); +} + + +Future<bool> ComposingContainerizer::launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint) +{ + return dispatch(process, + &ComposingContainerizerProcess::launch, + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); +} + + +Future<Nothing> ComposingContainerizer::update( + const ContainerID& containerId, + const Resources& resources) +{ + return dispatch(process, + &ComposingContainerizerProcess::update, + containerId, + resources); +} + + +Future<ResourceStatistics> ComposingContainerizer::usage( + const ContainerID& containerId) +{ + return dispatch(process, &ComposingContainerizerProcess::usage, containerId); +} + + +Future<containerizer::Termination> ComposingContainerizer::wait( + const ContainerID& containerId) +{ + return dispatch(process, &ComposingContainerizerProcess::wait, containerId); +} + + +void ComposingContainerizer::destroy(const ContainerID& containerId) +{ + dispatch(process, &ComposingContainerizerProcess::destroy, containerId); +} + + +Future<hashset<ContainerID> > ComposingContainerizer::containers() +{ + return dispatch(process, &ComposingContainerizerProcess::containers); +} + + +ComposingContainerizerProcess::ComposingContainerizerProcess( + const vector<Containerizer*>& containerizers) + : containerizers_(containerizers) +{ + foreach (Containerizer* containerizer, containerizers_) { + containers_[containerizer] = hashset<ContainerID>(); + } +} + + +ComposingContainerizerProcess::~ComposingContainerizerProcess() +{ + foreach (Containerizer* containerizer, containerizers_) { + delete containerizer; + } + containerizers_.clear(); + containers_.clear(); +} + + +Future<Nothing> ComposingContainerizerProcess::recover( + const Option<state::SlaveState>& state) +{ + // Recover each containerizer in parallel. + list<Future<Nothing> > futures; + foreach (Containerizer* containerizer, containerizers_) { + futures.push_back(containerizer->recover(state)); + } + + return collect(futures) + .then(defer(self(), &Self::_recover)); +} + + +Future<Nothing> ComposingContainerizerProcess::_recover() +{ + // Now collect all the running containers in order to multiplex. + list<Future<Nothing> > futures; + foreach (Containerizer* containerizer, containerizers_) { + Future<Nothing> future = containerizer->containers() + .then(defer(self(), &Self::__recover, containerizer, lambda::_1)); + futures.push_back(future); + } + + return collect(futures) + .then(lambda::bind(&Self::___recover)); +} + + +Future<Nothing> ComposingContainerizerProcess::__recover( + Containerizer* containerizer, + const hashset<ContainerID>& containers) +{ + containers_[containerizer] = containers; + return Nothing(); +} + + +Future<Nothing> ComposingContainerizerProcess::___recover() +{ + return Nothing(); +} + + +Future<bool> ComposingContainerizerProcess::launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint) +{ + // Try each containerizer. If none of them handle the + // TaskInfo/ExecutorInfo then return a Failure. + vector<Containerizer*>::iterator containerizer = containerizers_.begin(); + + return (*containerizer)->launch( + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint) + .then(defer(self(), + &Self::_launch, + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint, + containerizer, + lambda::_1)); +} + + +Future<bool> ComposingContainerizerProcess::_launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + vector<Containerizer*>::iterator containerizer, + bool launched) +{ + if (launched) { + containers_[*containerizer].insert(containerId); + return true; + } + + // Try the next containerizer. + ++containerizer; + + if (containerizer == containerizers_.end()) { + return false; + } + + return (*containerizer)->launch( + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint) + .then(defer(self(), + &Self::_launch, + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint, + containerizer, + lambda::_1)); +} + + +Future<bool> ComposingContainerizerProcess::launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint) +{ + // Try each containerizer. If none of them handle the + // TaskInfo/ExecutorInfo then return a Failure. + vector<Containerizer*>::iterator containerizer = containerizers_.begin(); + + return (*containerizer)->launch( + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint) + .then(defer(self(), + &Self::_launch, + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint, + containerizer, + lambda::_1)); +} + +Future<bool> ComposingContainerizerProcess::_launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + vector<Containerizer*>::iterator containerizer, + bool launched) +{ + if (launched) { + containers_[*containerizer].insert(containerId); + return true; + } + + // Try the next containerizer. + ++containerizer; + + if (containerizer == containerizers_.end()) { + return false; + } + + return (*containerizer)->launch( + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint) + .then(defer(self(), + &Self::_launch, + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint, + containerizer, + lambda::_1)); +} + + +Future<Nothing> ComposingContainerizerProcess::update( + const ContainerID& containerId, + const Resources& resources) +{ + foreachpair (Containerizer* containerizer, + const hashset<ContainerID>& containers, + containers_) { + if (containers.contains(containerId)) { + return containerizer->update(containerId, resources); + } + } + + return Failure("No container found"); +} + + +Future<ResourceStatistics> ComposingContainerizerProcess::usage( + const ContainerID& containerId) +{ + foreachpair (Containerizer* containerizer, + const hashset<ContainerID>& containers, + containers_) { + if (containers.contains(containerId)) { + return containerizer->usage(containerId); + } + } + + return Failure("No container found"); +} + + +Future<containerizer::Termination> ComposingContainerizerProcess::wait( + const ContainerID& containerId) +{ + foreachpair (Containerizer* containerizer, + const hashset<ContainerID>& containers, + containers_) { + if (containers.contains(containerId)) { + return containerizer->wait(containerId); + } + } + + return Failure("No container found"); +} + + +void ComposingContainerizerProcess::destroy(const ContainerID& containerId) +{ + foreachpair (Containerizer* containerizer, + const hashset<ContainerID>& containers, + containers_) { + if (containers.contains(containerId)) { + containerizer->destroy(containerId); + break; + } + } +} + + +// TODO(benh): Move into stout/hashset.hpp +template <typename Elem> +hashset<Elem> merge(const std::list<hashset<Elem> >& list) +{ + hashset<Elem> result; + foreach (const hashset<Elem>& set, list) { + result.insert(set.begin(), set.end()); + } + return result; +} + + +Future<hashset<ContainerID> > ComposingContainerizerProcess::containers() +{ + return merge(containers_.values()); +} + +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/composing.hpp b/src/slave/containerizer/composing.hpp new file mode 100644 index 0000000..f1e60b0 --- /dev/null +++ b/src/slave/containerizer/composing.hpp @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __COMPOSING_CONTAINERIZER_HPP__ +#define __COMPOSING_CONTAINERIZER_HPP__ + +#include <map> + +#include <mesos/mesos.hpp> +#include <mesos/resources.hpp> + +#include <mesos/containerizer/containerizer.hpp> + +#include <process/future.hpp> +#include <process/process.hpp> + +#include <stout/hashmap.hpp> +#include <stout/hashset.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> + + +namespace mesos { +namespace internal { +namespace slave { + +// Forward declaration. +class ComposingContainerizerProcess; + +class ComposingContainerizer : public Containerizer +{ +public: + static Try<ComposingContainerizer*> create( + const std::vector<Containerizer*>& containerizers); + + ComposingContainerizer( + const std::vector<Containerizer*>& containerizers); + + virtual ~ComposingContainerizer(); + + virtual process::Future<Nothing> recover( + const Option<state::SlaveState>& state); + + virtual process::Future<bool> launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint); + + virtual process::Future<bool> launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint); + + virtual process::Future<Nothing> update( + const ContainerID& containerId, + const Resources& resources); + + virtual process::Future<ResourceStatistics> usage( + const ContainerID& containerId); + + virtual process::Future<containerizer::Termination> wait( + const ContainerID& containerId); + + virtual void destroy(const ContainerID& containerId); + + virtual process::Future<hashset<ContainerID> > containers(); + +private: + ComposingContainerizerProcess* process; +}; + +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __COMPOSING_CONTAINERIZER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp index 1b71f33..d62d25d 100644 --- a/src/slave/containerizer/containerizer.cpp +++ b/src/slave/containerizer/containerizer.cpp @@ -34,6 +34,7 @@ #ifdef __linux__ #include "slave/containerizer/linux_launcher.hpp" #endif // __linux__ +#include "slave/containerizer/composing.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/containerizer/isolator.hpp" #include "slave/containerizer/launcher.hpp" @@ -153,12 +154,42 @@ Try<Resources> Containerizer::resources(const Flags& flags) Try<Containerizer*> Containerizer::create(const Flags& flags, bool local) { - if (flags.isolation == "external") { - return new ExternalContainerizer(flags); + // TODO(benh): We need to store which containerizer or + // containerizers were being used. See MESOS-1663. + + // Create containerizer(s). + vector<Containerizer*> containerizers; + + foreach (const string& type, strings::split(flags.containerizers, ",")) { + if (type == "mesos") { + Try<MesosContainerizer*> containerizer = + MesosContainerizer::create(flags, local); + if (containerizer.isError()) { + return Error("Could not create MesosContainerizer: " + + containerizer.error()); + } else { + containerizers.push_back(containerizer.get()); + } + } else if (type == "external") { + Try<Containerizer*> containerizer = + ExternalContainerizer::create(flags, local); + if (containerizer.isError()) { + return Error("Could not create ExternalContainerizer: " + + containerizer.error()); + } else { + containerizers.push_back(containerizer.get()); + } + } else { + return Error("Unknown or unsupported containerizer: " + type); + } + } + + if (containerizers.size() == 1) { + return containerizers.front(); } - Try<MesosContainerizer*> containerizer = - MesosContainerizer::create(flags, local); + Try<ComposingContainerizer*> containerizer = + ComposingContainerizer::create(containerizers); if (containerizer.isError()) { return Error(containerizer.error()); http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/flags.hpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 146c401..5ea2f38 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -262,6 +262,16 @@ public: "The path to the external containerizer executable used when\n" "external isolation is activated (--isolation=external).\n"); + add(&Flags::containerizers, + "containerizers", + "Comma separated list of containerizer implementations\n" + "to compose in order to provide containerization.\n" + "Available options are 'mesos', 'external', and\n" + "'docker' (on Linux). The order the containerizers\n" + "are specified is the order they are tried\n" + "(--containerizers=mesos).\n", + "mesos"); + add(&Flags::default_container_image, "default_container_image", "The default container image to use if not specified by a task,\n" @@ -330,6 +340,7 @@ public: #endif Option<std::string> credential; Option<std::string> containerizer_path; + std::string containerizers; Option<std::string> default_container_image; #ifdef WITH_NETWORK_ISOLATOR uint16_t ephemeral_ports_per_container;
