Added a storage local resource provider test for CSI plugin restart.

The test does the same as the `PublishResources` test, but it kills the
CSI plugin container between each operation.

Review: https://reviews.apache.org/r/64998/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69e5e28e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69e5e28e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69e5e28e

Branch: refs/heads/master
Commit: 69e5e28ed0756f94c839a453052d268696d66a33
Parents: 0e4d6f2
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
Authored: Fri Jan 19 15:36:27 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Fri Jan 19 15:50:08 2018 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 src/slave/container_daemon.cpp                  |  47 +--
 src/slave/container_daemon_process.hpp          |  82 ++++++
 .../storage_local_resource_provider_tests.cpp   | 290 +++++++++++++++++++
 4 files changed, 375 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 191594b..fe8f689 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1190,6 +1190,7 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   slave/compatibility.hpp                                              \
   slave/constants.hpp                                                  \
   slave/container_daemon.hpp                                           \
+  slave/container_daemon_process.hpp                                   \
   slave/flags.hpp                                                      \
   slave/gc.hpp                                                         \
   slave/gc_process.hpp                                                 \

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp
index d74fa51..6458d1f 100644
--- a/src/slave/container_daemon.cpp
+++ b/src/slave/container_daemon.cpp
@@ -16,20 +16,17 @@
 
 #include "slave/container_daemon.hpp"
 
-#include <mesos/agent/agent.hpp>
-
 #include <process/defer.hpp>
 #include <process/id.hpp>
-#include <process/process.hpp>
 
 #include <stout/lambda.hpp>
 #include <stout/stringify.hpp>
 #include <stout/unreachable.hpp>
 
-#include "common/http.hpp"
-
 #include "internal/evolve.hpp"
 
+#include "slave/container_daemon_process.hpp"
+
 namespace http = process::http;
 
 using std::string;
@@ -64,46 +61,6 @@ static inline http::Headers getAuthHeader(const 
Option<string>& authToken)
 }
 
 
-class ContainerDaemonProcess : public Process<ContainerDaemonProcess>
-{
-public:
-  explicit ContainerDaemonProcess(
-      const http::URL& _agentUrl,
-      const Option<string>& _authToken,
-      const ContainerID& containerId,
-      const Option<CommandInfo>& commandInfo,
-      const Option<Resources>& resources,
-      const Option<ContainerInfo>& containerInfo,
-      const Option<std::function<Future<Nothing>()>>& _postStartHook,
-      const Option<std::function<Future<Nothing>()>>& _postStopHook);
-
-  ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
-
-  ContainerDaemonProcess& operator=(
-      const ContainerDaemonProcess& other) = delete;
-
-  Future<Nothing> wait();
-
-protected:
-  void initialize() override;
-
-private:
-  void launchContainer();
-  void waitContainer();
-
-  const http::URL agentUrl;
-  const Option<string> authToken;
-  const ContentType contentType;
-  const Option<std::function<Future<Nothing>()>> postStartHook;
-  const Option<std::function<Future<Nothing>()>> postStopHook;
-
-  Call launchCall;
-  Call waitCall;
-
-  Promise<Nothing> terminated;
-};
-
-
 ContainerDaemonProcess::ContainerDaemonProcess(
     const http::URL& _agentUrl,
     const Option<string>& _authToken,

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon_process.hpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon_process.hpp 
b/src/slave/container_daemon_process.hpp
new file mode 100644
index 0000000..a5d19a0
--- /dev/null
+++ b/src/slave/container_daemon_process.hpp
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+#define __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+
+#include <functional>
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/agent/agent.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+
+#include "common/http.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class ContainerDaemonProcess : public process::Process<ContainerDaemonProcess>
+{
+public:
+  explicit ContainerDaemonProcess(
+      const process::http::URL& _agentUrl,
+      const Option<std::string>& _authToken,
+      const ContainerID& containerId,
+      const Option<CommandInfo>& commandInfo,
+      const Option<Resources>& resources,
+      const Option<ContainerInfo>& containerInfo,
+      const Option<std::function<process::Future<Nothing>()>>& _postStartHook,
+      const Option<std::function<process::Future<Nothing>()>>& _postStopHook);
+
+  ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
+
+  ContainerDaemonProcess& operator=(
+      const ContainerDaemonProcess& other) = delete;
+
+  process::Future<Nothing> wait();
+
+  // Made public for testing purpose.
+  void launchContainer();
+  void waitContainer();
+
+protected:
+  void initialize() override;
+
+private:
+  const process::http::URL agentUrl;
+  const Option<std::string> authToken;
+  const ContentType contentType;
+  const Option<std::function<process::Future<Nothing>()>> postStartHook;
+  const Option<std::function<process::Future<Nothing>()>> postStopHook;
+
+  agent::Call launchCall;
+  agent::Call waitCall;
+
+  process::Promise<Nothing> terminated;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index dfe4faf..1b21527 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -24,6 +24,12 @@
 
 #include "module/manager.hpp"
 
+#include "slave/container_daemon_process.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
@@ -31,6 +37,8 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
+using mesos::internal::slave::ContainerDaemonProcess;
+
 using mesos::master::detector::MasterDetector;
 
 using mesos::v1::resource_provider::Call;
@@ -1449,6 +1457,288 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_PublishResourcesRecovery)
 
 
 // This test verifies that the storage local resource provider can
+// restart its CSI plugin after it is killed and continue to work
+// properly.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    ROOT_PublishUnpublishResourcesPluginKilled)
+{
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  slave::Fetcher fetcher(slaveFlags);
+
+  Try<slave::MesosContainerizer*> _containerizer =
+    slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing the same MOUNT disk resource after `CREATE`,
+  //      `LAUNCH` and `DESTROY`.
+  //   4. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> taskFinishedOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  // We are only interested in storage pools and volume created from
+  // them, which have a "volume-default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "volume-default" &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Get the ID of the CSI plugin container.
+  Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+  AWAIT_READY(pluginContainers);
+  ASSERT_EQ(1u, pluginContainers->size());
+
+  const ContainerID& pluginContainerId = *pluginContainers->begin();
+
+  Future<Nothing> pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  Future<int> pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, 
Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      acceptFilters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Put a file into the volume.
+  ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+  // Create a persistent volume on the CSI volume, then launch a task to
+  // use the persistent volume.
+  Resource persistentVolume = volume.get();
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+      persistentVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&taskFinishedOffers));
+
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           volumeCreatedOffers->at(0).slave_id(),
+           persistentVolume,
+           createCommandInfo("test -f " + path::join("volume", "file")))})},
+      acceptFilters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+  AWAIT_READY(taskFinishedOffers);
+
+  pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Destroy the persistent volume and the CSI volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  driver.acceptOffers(
+      {taskFinishedOffers->at(0).id()},
+      {DESTROY(persistentVolume),
+       DESTROY_VOLUME(volume.get())},
+      acceptFilters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
+// This test verifies that the storage local resource provider can
 // convert pre-existing CSI volumes into mount or block volumes.
 TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 {

Reply via email to