Repository: mesos Updated Branches: refs/heads/master 67652f436 -> 73161e54c
Fixed containerizer not receiving calls when launching. Review: https://reviews.apache.org/r/26486 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/73161e54 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/73161e54 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/73161e54 Branch: refs/heads/master Commit: 73161e54caaab9506951cfc1c91fdc0cc27663e2 Parents: 67652f4 Author: Timothy Chen <[email protected]> Authored: Fri Oct 31 15:38:39 2014 -0700 Committer: Timothy Chen <[email protected]> Committed: Fri Oct 31 15:38:40 2014 -0700 ---------------------------------------------------------------------- src/Makefile.am | 1 + src/slave/containerizer/composing.cpp | 267 ++++++++++++----------- src/tests/composing_containerizer_tests.cpp | 165 ++++++++++++++ 3 files changed, 304 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 2d72a70..e6a0715 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1180,6 +1180,7 @@ mesos_tests_SOURCES = \ tests/attributes_tests.cpp \ tests/authentication_tests.cpp \ tests/authorization_tests.cpp \ + tests/composing_containerizer_tests.cpp \ tests/containerizer.cpp \ tests/containerizer_tests.cpp \ tests/credentials_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/slave/containerizer/composing.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/composing.cpp b/src/slave/containerizer/composing.cpp index 9022700..a6ae817 100644 --- a/src/slave/containerizer/composing.cpp +++ b/src/slave/containerizer/composing.cpp @@ -48,7 +48,8 @@ class ComposingContainerizerProcess { public: ComposingContainerizerProcess( - const vector<Containerizer*>& containerizers); + const vector<Containerizer*>& containerizers) + : containerizers_(containerizers) {} virtual ~ComposingContainerizerProcess(); @@ -98,18 +99,7 @@ private: Future<bool> _launch( const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const string& directory, - const Option<string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, - vector<Containerizer*>::iterator containerizer, - bool launched); - - Future<bool> _launch( - const ContainerID& containerId, - const TaskInfo& taskInfo, + const Option<TaskInfo>& taskInfo, const ExecutorInfo& executorInfo, const string& directory, const Option<string>& user, @@ -120,7 +110,22 @@ private: bool launched); vector<Containerizer*> containerizers_; - hashmap<Containerizer*, hashset<ContainerID> > containers_; + + // The states that the composing containerizer cares about for the + // container it is asked to launch. + enum State { + LAUNCHING, + LAUNCHED, + DESTROYED + }; + + struct Container + { + State state; + Containerizer* containerizer; + }; + + hashmap<ContainerID, Container*> containers_; }; @@ -235,21 +240,16 @@ Future<hashset<ContainerID> > ComposingContainerizer::containers() } -ComposingContainerizerProcess::ComposingContainerizerProcess( - const vector<Containerizer*>& containerizers) - : containerizers_(containerizers) -{ - foreach (Containerizer* containerizer, containerizers_) { - containers_[containerizer] = hashset<ContainerID>(); - } -} - - ComposingContainerizerProcess::~ComposingContainerizerProcess() { foreach (Containerizer* containerizer, containerizers_) { delete containerizer; } + + foreachvalue (Container* container, containers_) { + delete container; + } + containerizers_.clear(); containers_.clear(); } @@ -288,7 +288,12 @@ Future<Nothing> ComposingContainerizerProcess::__recover( Containerizer* containerizer, const hashset<ContainerID>& containers) { - containers_[containerizer] = containers; + foreach (const ContainerID& containerId, containers) { + Container* container = new Container(); + container->state = LAUNCHED; + container->containerizer = containerizer; + containers_[containerId] = container; + } return Nothing(); } @@ -308,10 +313,20 @@ Future<bool> ComposingContainerizerProcess::launch( const PID<Slave>& slavePid, bool checkpoint) { + if (containers_.contains(containerId)) { + return Failure("Container '" + containerId.value() + + "' is already launching"); + } + // Try each containerizer. If none of them handle the // TaskInfo/ExecutorInfo then return a Failure. vector<Containerizer*>::iterator containerizer = containerizers_.begin(); + Container* container = new Container(); + container->state = LAUNCHING; + container->containerizer = *containerizer; + containers_[containerId] = container; + return (*containerizer)->launch( containerId, executorInfo, @@ -323,6 +338,7 @@ Future<bool> ComposingContainerizerProcess::launch( .then(defer(self(), &Self::_launch, containerId, + None(), executorInfo, directory, user, @@ -336,6 +352,7 @@ Future<bool> ComposingContainerizerProcess::launch( Future<bool> ComposingContainerizerProcess::_launch( const ContainerID& containerId, + const Option<TaskInfo>& taskInfo, const ExecutorInfo& executorInfo, const string& directory, const Option<string>& user, @@ -345,8 +362,19 @@ Future<bool> ComposingContainerizerProcess::_launch( vector<Containerizer*>::iterator containerizer, bool launched) { + // The container struct won't be cleaned up by destroy because + // in destroy we only forward the destroy, and wait until the + // launch returns and clean up here. + CHECK(containers_.contains(containerId)); + Container* container = containers_[containerId]; + if (container->state == DESTROYED) { + containers_.erase(containerId); + delete container; + return Failure("Container was destroyed while launching"); + } + if (launched) { - containers_[*containerizer].insert(containerId); + container->state = LAUNCHED; return true; } @@ -354,28 +382,45 @@ Future<bool> ComposingContainerizerProcess::_launch( ++containerizer; if (containerizer == containerizers_.end()) { + containers_.erase(containerId); + delete container; return false; } - return (*containerizer)->launch( - containerId, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint) - .then(defer(self(), - &Self::_launch, - containerId, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint, - containerizer, - lambda::_1)); + container->containerizer = *containerizer; + + Future<bool> f = taskInfo.isSome() ? + (*containerizer)->launch( + containerId, + taskInfo.get(), + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint) : + (*containerizer)->launch( + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); + + return f.then( + defer(self(), + &Self::_launch, + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint, + containerizer, + lambda::_1)); } @@ -389,56 +434,19 @@ Future<bool> ComposingContainerizerProcess::launch( const PID<Slave>& slavePid, bool checkpoint) { + if (containers_.contains(containerId)) { + return Failure("Container '" + stringify(containerId) + + "' is already launching"); + } + // Try each containerizer. If none of them handle the // TaskInfo/ExecutorInfo then return a Failure. vector<Containerizer*>::iterator containerizer = containerizers_.begin(); - return (*containerizer)->launch( - containerId, - taskInfo, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint) - .then(defer(self(), - &Self::_launch, - containerId, - taskInfo, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint, - containerizer, - lambda::_1)); -} - -Future<bool> ComposingContainerizerProcess::_launch( - const ContainerID& containerId, - const TaskInfo& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const Option<string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, - vector<Containerizer*>::iterator containerizer, - bool launched) -{ - if (launched) { - containers_[*containerizer].insert(containerId); - return true; - } - - // Try the next containerizer. - ++containerizer; - - if (containerizer == containerizers_.end()) { - return false; - } + Container* container = new Container(); + container->state = LAUNCHING; + container->containerizer = *containerizer; + containers_[containerId] = container; return (*containerizer)->launch( containerId, @@ -468,76 +476,77 @@ Future<Nothing> ComposingContainerizerProcess::update( const ContainerID& containerId, const Resources& resources) { - foreachpair (Containerizer* containerizer, - const hashset<ContainerID>& containers, - containers_) { - if (containers.contains(containerId)) { - return containerizer->update(containerId, resources); - } + if (!containers_.contains(containerId)) { + return Failure("Container '" + containerId.value() + "' not found"); } - return Failure("No container found"); + return containers_[containerId]->containerizer->update( + containerId, resources); } Future<ResourceStatistics> ComposingContainerizerProcess::usage( const ContainerID& containerId) { - foreachpair (Containerizer* containerizer, - const hashset<ContainerID>& containers, - containers_) { - if (containers.contains(containerId)) { - return containerizer->usage(containerId); - } + if (!containers_.contains(containerId)) { + return Failure("Container '" + containerId.value() + "' not found"); } - return Failure("No container found"); + return containers_[containerId]->containerizer->usage(containerId); } Future<containerizer::Termination> ComposingContainerizerProcess::wait( const ContainerID& containerId) { - foreachpair (Containerizer* containerizer, - const hashset<ContainerID>& containers, - containers_) { - if (containers.contains(containerId)) { - return containerizer->wait(containerId); - } + if (!containers_.contains(containerId)) { + return Failure("Container '" + containerId.value() + "' not found"); } - return Failure("No container found"); + return containers_[containerId]->containerizer->wait(containerId); } void ComposingContainerizerProcess::destroy(const ContainerID& containerId) { - foreachpair (Containerizer* containerizer, - const hashset<ContainerID>& containers, - containers_) { - if (containers.contains(containerId)) { - containerizer->destroy(containerId); - break; - } + if (!containers_.contains(containerId)) { + LOG(WARNING) << "Container '" << containerId.value() << "' not found"; + return; } -} + Container* container = containers_[containerId]; -// TODO(benh): Move into stout/hashset.hpp. -template <typename Elem> -hashset<Elem> merge(const std::list<hashset<Elem> >& list) -{ - hashset<Elem> result; - foreach (const hashset<Elem>& set, list) { - result.insert(set.begin(), set.end()); + if (container->state == DESTROYED) { + LOG(WARNING) << "Container '" << containerId.value() + << "' is already destroyed"; + return; } - return result; + + // It's ok to forward destroy to any containerizer which is currently + // launching the container, because we expect each containerizer to + // handle calling destroy on non-existing container. + // The composing containerizer will not move to the next + // containerizer for a container that is destroyed as well. + container->containerizer->destroy(containerId); + + if (container->state == LAUNCHING) { + // Record the fact that this container was asked to be destroyed + // so that we won't try and launch this container using any other + // containerizers in the event the current containerizer has + // decided it can't launch the container. + container->state = DESTROYED; + return; + } + + // If the container is launched, then we can simply cleanup. + containers_.erase(containerId); + delete container; } Future<hashset<ContainerID> > ComposingContainerizerProcess::containers() { - return merge(containers_.values()); + return containers_.keys(); } } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/tests/composing_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/composing_containerizer_tests.cpp b/src/tests/composing_containerizer_tests.cpp new file mode 100644 index 0000000..5ab5a36 --- /dev/null +++ b/src/tests/composing_containerizer_tests.cpp @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <vector> + +#include <process/future.hpp> +#include <process/gmock.hpp> + +#include <stout/option.hpp> + +#include "messages/messages.hpp" + +#include "slave/containerizer/containerizer.hpp" +#include "slave/containerizer/composing.hpp" + +#include "tests/mesos.hpp" + +using namespace mesos; +using namespace mesos::internal; +using namespace mesos::internal::slave; +using namespace mesos::internal::tests; + +using namespace process; + +using std::vector; + +using testing::_; +using testing::Return; + +class ComposingContainerizerTest : public MesosTest {}; + +class MockContainerizer : public slave::Containerizer +{ +public: + MOCK_METHOD1( + recover, + process::Future<Nothing>( + const Option<slave::state::SlaveState>&)); + + MOCK_METHOD7( + launch, + process::Future<bool>( + const ContainerID&, + const ExecutorInfo&, + const std::string&, + const Option<std::string>&, + const SlaveID&, + const process::PID<Slave>&, + bool)); + + MOCK_METHOD8( + launch, + process::Future<bool>( + const ContainerID&, + const TaskInfo&, + const ExecutorInfo&, + const std::string&, + const Option<std::string>&, + const SlaveID&, + const process::PID<Slave>&, + bool)); + + MOCK_METHOD2( + update, + process::Future<Nothing>( + const ContainerID&, + const Resources&)); + + MOCK_METHOD1( + usage, + process::Future<ResourceStatistics>( + const ContainerID&)); + + MOCK_METHOD1( + wait, + process::Future<containerizer::Termination>( + const ContainerID&)); + + MOCK_METHOD1( + destroy, + void(const ContainerID&)); + + MOCK_METHOD0( + containers, + process::Future<hashset<ContainerID> >()); +}; + + +// This test checks if destroy is called while container is being +// launched, the composing containerizer still calls the underlying +// containerizer's destroy and skip calling the rest of the +// containerizers. +TEST_F(ComposingContainerizerTest, DestroyWhileLaunching) +{ + vector<Containerizer*> containerizers; + + MockContainerizer* mockContainerizer = new MockContainerizer(); + MockContainerizer* mockContainerizer2 = new MockContainerizer(); + + containerizers.push_back(mockContainerizer); + containerizers.push_back(mockContainerizer2); + + ComposingContainerizer containerizer(containerizers); + ContainerID containerId; + containerId.set_value("container"); + TaskInfo taskInfo; + ExecutorInfo executorInfo; + SlaveID slaveId; + PID<Slave> slavePid; + + Promise<bool> launchPromise; + + EXPECT_CALL(*mockContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(Return(launchPromise.future())); + + Future<Nothing> destroy; + + EXPECT_CALL(*mockContainerizer, destroy(_)) + .WillOnce(FutureSatisfy(&destroy)); + + Future<bool> launch = containerizer.launch( + containerId, + taskInfo, + executorInfo, + "dir", + "user", + slaveId, + slavePid, + false); + + Resources resources = Resources::parse("cpus:1;mem:256").get(); + + EXPECT_TRUE(launch.isPending()); + + containerizer.destroy(containerId); + + EXPECT_CALL(*mockContainerizer2, launch(_, _, _, _, _, _, _, _)) + .Times(0); + + // We make sure the destroy is being called on the first containerizer. + // The second containerizer shouldn't be called as well since the + // container is already destroyed. + AWAIT_READY(destroy); + + launchPromise.set(false); + AWAIT_FAILED(launch); +}
