Repository: mesos
Updated Branches:
  refs/heads/master 67652f436 -> 73161e54c


Fixed containerizer not receiving calls when launching.

Review: https://reviews.apache.org/r/26486


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/73161e54
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/73161e54
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/73161e54

Branch: refs/heads/master
Commit: 73161e54caaab9506951cfc1c91fdc0cc27663e2
Parents: 67652f4
Author: Timothy Chen <[email protected]>
Authored: Fri Oct 31 15:38:39 2014 -0700
Committer: Timothy Chen <[email protected]>
Committed: Fri Oct 31 15:38:40 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                             |   1 +
 src/slave/containerizer/composing.cpp       | 267 ++++++++++++-----------
 src/tests/composing_containerizer_tests.cpp | 165 ++++++++++++++
 3 files changed, 304 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2d72a70..e6a0715 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1180,6 +1180,7 @@ mesos_tests_SOURCES =                             \
   tests/attributes_tests.cpp                   \
   tests/authentication_tests.cpp               \
   tests/authorization_tests.cpp                        \
+  tests/composing_containerizer_tests.cpp       \
   tests/containerizer.cpp                      \
   tests/containerizer_tests.cpp                        \
   tests/credentials_tests.cpp                  \

http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/slave/containerizer/composing.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/composing.cpp 
b/src/slave/containerizer/composing.cpp
index 9022700..a6ae817 100644
--- a/src/slave/containerizer/composing.cpp
+++ b/src/slave/containerizer/composing.cpp
@@ -48,7 +48,8 @@ class ComposingContainerizerProcess
 {
 public:
   ComposingContainerizerProcess(
-      const vector<Containerizer*>& containerizers);
+      const vector<Containerizer*>& containerizers)
+    : containerizers_(containerizers) {}
 
   virtual ~ComposingContainerizerProcess();
 
@@ -98,18 +99,7 @@ private:
 
   Future<bool> _launch(
       const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const string& directory,
-      const Option<string>& user,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint,
-      vector<Containerizer*>::iterator containerizer,
-      bool launched);
-
-  Future<bool> _launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
+      const Option<TaskInfo>& taskInfo,
       const ExecutorInfo& executorInfo,
       const string& directory,
       const Option<string>& user,
@@ -120,7 +110,22 @@ private:
       bool launched);
 
   vector<Containerizer*> containerizers_;
-  hashmap<Containerizer*, hashset<ContainerID> > containers_;
+
+  // The states that the composing containerizer cares about for the
+  // container it is asked to launch.
+  enum State {
+    LAUNCHING,
+    LAUNCHED,
+    DESTROYED
+  };
+
+  struct Container
+  {
+    State state;
+    Containerizer* containerizer;
+  };
+
+  hashmap<ContainerID, Container*> containers_;
 };
 
 
@@ -235,21 +240,16 @@ Future<hashset<ContainerID> > 
ComposingContainerizer::containers()
 }
 
 
-ComposingContainerizerProcess::ComposingContainerizerProcess(
-    const vector<Containerizer*>& containerizers)
-  : containerizers_(containerizers)
-{
-  foreach (Containerizer* containerizer, containerizers_) {
-    containers_[containerizer] = hashset<ContainerID>();
-  }
-}
-
-
 ComposingContainerizerProcess::~ComposingContainerizerProcess()
 {
   foreach (Containerizer* containerizer, containerizers_) {
     delete containerizer;
   }
+
+  foreachvalue (Container* container, containers_) {
+    delete container;
+  }
+
   containerizers_.clear();
   containers_.clear();
 }
@@ -288,7 +288,12 @@ Future<Nothing> ComposingContainerizerProcess::__recover(
     Containerizer* containerizer,
     const hashset<ContainerID>& containers)
 {
-  containers_[containerizer] = containers;
+  foreach (const ContainerID& containerId, containers) {
+    Container* container = new Container();
+    container->state = LAUNCHED;
+    container->containerizer = containerizer;
+    containers_[containerId] = container;
+  }
   return Nothing();
 }
 
@@ -308,10 +313,20 @@ Future<bool> ComposingContainerizerProcess::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
+  if (containers_.contains(containerId)) {
+    return Failure("Container '" + containerId.value() +
+                   "' is already launching");
+  }
+
   // Try each containerizer. If none of them handle the
   // TaskInfo/ExecutorInfo then return a Failure.
   vector<Containerizer*>::iterator containerizer = containerizers_.begin();
 
+  Container* container = new Container();
+  container->state = LAUNCHING;
+  container->containerizer = *containerizer;
+  containers_[containerId] = container;
+
   return (*containerizer)->launch(
       containerId,
       executorInfo,
@@ -323,6 +338,7 @@ Future<bool> ComposingContainerizerProcess::launch(
     .then(defer(self(),
                 &Self::_launch,
                 containerId,
+                None(),
                 executorInfo,
                 directory,
                 user,
@@ -336,6 +352,7 @@ Future<bool> ComposingContainerizerProcess::launch(
 
 Future<bool> ComposingContainerizerProcess::_launch(
     const ContainerID& containerId,
+    const Option<TaskInfo>& taskInfo,
     const ExecutorInfo& executorInfo,
     const string& directory,
     const Option<string>& user,
@@ -345,8 +362,19 @@ Future<bool> ComposingContainerizerProcess::_launch(
     vector<Containerizer*>::iterator containerizer,
     bool launched)
 {
+  // The container struct won't be cleaned up by destroy because
+  // in destroy we only forward the destroy, and wait until the
+  // launch returns and clean up here.
+  CHECK(containers_.contains(containerId));
+  Container* container = containers_[containerId];
+  if (container->state == DESTROYED) {
+    containers_.erase(containerId);
+    delete container;
+    return Failure("Container was destroyed while launching");
+  }
+
   if (launched) {
-    containers_[*containerizer].insert(containerId);
+    container->state = LAUNCHED;
     return true;
   }
 
@@ -354,28 +382,45 @@ Future<bool> ComposingContainerizerProcess::_launch(
   ++containerizer;
 
   if (containerizer == containerizers_.end()) {
+    containers_.erase(containerId);
+    delete container;
     return false;
   }
 
-  return (*containerizer)->launch(
-      containerId,
-      executorInfo,
-      directory,
-      user,
-      slaveId,
-      slavePid,
-      checkpoint)
-    .then(defer(self(),
-                &Self::_launch,
-                containerId,
-                executorInfo,
-                directory,
-                user,
-                slaveId,
-                slavePid,
-                checkpoint,
-                containerizer,
-                lambda::_1));
+  container->containerizer = *containerizer;
+
+  Future<bool> f = taskInfo.isSome() ?
+      (*containerizer)->launch(
+          containerId,
+          taskInfo.get(),
+          executorInfo,
+          directory,
+          user,
+          slaveId,
+          slavePid,
+          checkpoint) :
+      (*containerizer)->launch(
+          containerId,
+          executorInfo,
+          directory,
+          user,
+          slaveId,
+          slavePid,
+          checkpoint);
+
+  return f.then(
+      defer(self(),
+            &Self::_launch,
+            containerId,
+            taskInfo,
+            executorInfo,
+            directory,
+            user,
+            slaveId,
+            slavePid,
+            checkpoint,
+            containerizer,
+            lambda::_1));
 }
 
 
@@ -389,56 +434,19 @@ Future<bool> ComposingContainerizerProcess::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
+  if (containers_.contains(containerId)) {
+    return Failure("Container '" + stringify(containerId) +
+                   "' is already launching");
+  }
+
   // Try each containerizer. If none of them handle the
   // TaskInfo/ExecutorInfo then return a Failure.
   vector<Containerizer*>::iterator containerizer = containerizers_.begin();
 
-  return (*containerizer)->launch(
-      containerId,
-      taskInfo,
-      executorInfo,
-      directory,
-      user,
-      slaveId,
-      slavePid,
-      checkpoint)
-    .then(defer(self(),
-                &Self::_launch,
-                containerId,
-                taskInfo,
-                executorInfo,
-                directory,
-                user,
-                slaveId,
-                slavePid,
-                checkpoint,
-                containerizer,
-                lambda::_1));
-}
-
-Future<bool> ComposingContainerizerProcess::_launch(
-    const ContainerID& containerId,
-    const TaskInfo& taskInfo,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint,
-    vector<Containerizer*>::iterator containerizer,
-    bool launched)
-{
-  if (launched) {
-    containers_[*containerizer].insert(containerId);
-    return true;
-  }
-
-  // Try the next containerizer.
-  ++containerizer;
-
-  if (containerizer == containerizers_.end()) {
-    return false;
-  }
+  Container* container = new Container();
+  container->state = LAUNCHING;
+  container->containerizer = *containerizer;
+  containers_[containerId] = container;
 
   return (*containerizer)->launch(
       containerId,
@@ -468,76 +476,77 @@ Future<Nothing> ComposingContainerizerProcess::update(
     const ContainerID& containerId,
     const Resources& resources)
 {
-  foreachpair (Containerizer* containerizer,
-               const hashset<ContainerID>& containers,
-               containers_) {
-    if (containers.contains(containerId)) {
-      return containerizer->update(containerId, resources);
-    }
+  if (!containers_.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not found");
   }
 
-  return Failure("No container found");
+  return containers_[containerId]->containerizer->update(
+      containerId, resources);
 }
 
 
 Future<ResourceStatistics> ComposingContainerizerProcess::usage(
     const ContainerID& containerId)
 {
-  foreachpair (Containerizer* containerizer,
-               const hashset<ContainerID>& containers,
-               containers_) {
-    if (containers.contains(containerId)) {
-      return containerizer->usage(containerId);
-    }
+  if (!containers_.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not found");
   }
 
-  return Failure("No container found");
+  return containers_[containerId]->containerizer->usage(containerId);
 }
 
 
 Future<containerizer::Termination> ComposingContainerizerProcess::wait(
     const ContainerID& containerId)
 {
-  foreachpair (Containerizer* containerizer,
-               const hashset<ContainerID>& containers,
-               containers_) {
-    if (containers.contains(containerId)) {
-      return containerizer->wait(containerId);
-    }
+  if (!containers_.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not found");
   }
 
-  return Failure("No container found");
+  return containers_[containerId]->containerizer->wait(containerId);
 }
 
 
 void ComposingContainerizerProcess::destroy(const ContainerID& containerId)
 {
-  foreachpair (Containerizer* containerizer,
-               const hashset<ContainerID>& containers,
-               containers_) {
-    if (containers.contains(containerId)) {
-      containerizer->destroy(containerId);
-      break;
-    }
+  if (!containers_.contains(containerId)) {
+    LOG(WARNING) << "Container '" << containerId.value() << "' not found";
+    return;
   }
-}
 
+  Container* container = containers_[containerId];
 
-// TODO(benh): Move into stout/hashset.hpp.
-template <typename Elem>
-hashset<Elem> merge(const std::list<hashset<Elem> >& list)
-{
-  hashset<Elem> result;
-  foreach (const hashset<Elem>& set, list) {
-    result.insert(set.begin(), set.end());
+  if (container->state == DESTROYED) {
+    LOG(WARNING) << "Container '" << containerId.value()
+                 << "' is already destroyed";
+    return;
   }
-  return result;
+
+  // It's ok to forward destroy to any containerizer which is currently
+  // launching the container, because we expect each containerizer to
+  // handle calling destroy on non-existing container.
+  // The composing containerizer will not move to the next
+  // containerizer for a container that is destroyed as well.
+  container->containerizer->destroy(containerId);
+
+  if (container->state == LAUNCHING) {
+    // Record the fact that this container was asked to be destroyed
+    // so that we won't try and launch this container using any other
+    // containerizers in the event the current containerizer has
+    // decided it can't launch the container.
+    container->state = DESTROYED;
+    return;
+  }
+
+  // If the container is launched, then we can simply cleanup.
+  containers_.erase(containerId);
+  delete container;
 }
 
 
 Future<hashset<ContainerID> > ComposingContainerizerProcess::containers()
 {
-  return merge(containers_.values());
+  return containers_.keys();
 }
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/73161e54/src/tests/composing_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/composing_containerizer_tests.cpp 
b/src/tests/composing_containerizer_tests.cpp
new file mode 100644
index 0000000..5ab5a36
--- /dev/null
+++ b/src/tests/composing_containerizer_tests.cpp
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+
+#include <stout/option.hpp>
+
+#include "messages/messages.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/composing.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+using namespace mesos::internal::tests;
+
+using namespace process;
+
+using std::vector;
+
+using testing::_;
+using testing::Return;
+
+class ComposingContainerizerTest : public MesosTest {};
+
+class MockContainerizer : public slave::Containerizer
+{
+public:
+  MOCK_METHOD1(
+      recover,
+      process::Future<Nothing>(
+          const Option<slave::state::SlaveState>&));
+
+  MOCK_METHOD7(
+      launch,
+      process::Future<bool>(
+          const ContainerID&,
+          const ExecutorInfo&,
+          const std::string&,
+          const Option<std::string>&,
+          const SlaveID&,
+          const process::PID<Slave>&,
+          bool));
+
+  MOCK_METHOD8(
+      launch,
+      process::Future<bool>(
+          const ContainerID&,
+          const TaskInfo&,
+          const ExecutorInfo&,
+          const std::string&,
+          const Option<std::string>&,
+          const SlaveID&,
+          const process::PID<Slave>&,
+          bool));
+
+  MOCK_METHOD2(
+      update,
+      process::Future<Nothing>(
+          const ContainerID&,
+          const Resources&));
+
+  MOCK_METHOD1(
+      usage,
+      process::Future<ResourceStatistics>(
+          const ContainerID&));
+
+  MOCK_METHOD1(
+      wait,
+      process::Future<containerizer::Termination>(
+          const ContainerID&));
+
+  MOCK_METHOD1(
+      destroy,
+      void(const ContainerID&));
+
+  MOCK_METHOD0(
+      containers,
+      process::Future<hashset<ContainerID> >());
+};
+
+
+// This test checks if destroy is called while container is being
+// launched, the composing containerizer still calls the underlying
+// containerizer's destroy and skip calling the rest of the
+// containerizers.
+TEST_F(ComposingContainerizerTest, DestroyWhileLaunching)
+{
+  vector<Containerizer*> containerizers;
+
+  MockContainerizer* mockContainerizer = new MockContainerizer();
+  MockContainerizer* mockContainerizer2 = new MockContainerizer();
+
+  containerizers.push_back(mockContainerizer);
+  containerizers.push_back(mockContainerizer2);
+
+  ComposingContainerizer containerizer(containerizers);
+  ContainerID containerId;
+  containerId.set_value("container");
+  TaskInfo taskInfo;
+  ExecutorInfo executorInfo;
+  SlaveID slaveId;
+  PID<Slave> slavePid;
+
+  Promise<bool> launchPromise;
+
+  EXPECT_CALL(*mockContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(Return(launchPromise.future()));
+
+  Future<Nothing> destroy;
+
+  EXPECT_CALL(*mockContainerizer, destroy(_))
+    .WillOnce(FutureSatisfy(&destroy));
+
+  Future<bool> launch = containerizer.launch(
+      containerId,
+      taskInfo,
+      executorInfo,
+      "dir",
+      "user",
+      slaveId,
+      slavePid,
+      false);
+
+  Resources resources = Resources::parse("cpus:1;mem:256").get();
+
+  EXPECT_TRUE(launch.isPending());
+
+  containerizer.destroy(containerId);
+
+  EXPECT_CALL(*mockContainerizer2, launch(_, _, _, _, _, _, _, _))
+    .Times(0);
+
+  // We make sure the destroy is being called on the first containerizer.
+  // The second containerizer shouldn't be called as well since the
+  // container is already destroyed.
+  AWAIT_READY(destroy);
+
+  launchPromise.set(false);
+  AWAIT_FAILED(launch);
+}

Reply via email to