This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fea9905c08f8c30090652a6047e3f3c242b46781
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Mon Apr 1 23:23:38 2019 -0700

    Refactored SLRP with `ServiceManager` to manage container lifecycles.
    
    Container management is moved out from SLRP to `ServiceManager`. It is
    agnostic to CSI versions, so can be used to manage plugin containers for
    both CSI v0 and v1 plugins.
    
    This patch squashes the changes from r/70169.
    
    Review: https://reviews.apache.org/r/70168/
---
 src/CMakeLists.txt                                 |   1 +
 src/Makefile.am                                    |   2 +
 src/csi/service_manager.cpp                        | 726 +++++++++++++++++++++
 src/csi/service_manager.hpp                        |  85 +++
 src/resource_provider/storage/provider.cpp         | 630 ++----------------
 src/resource_provider/storage/provider_process.hpp |  40 +-
 6 files changed, 868 insertions(+), 616 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 53b0e7b..f1c3114 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -244,6 +244,7 @@ set(CSI_SRC
   csi/metrics.cpp
   csi/paths.cpp
   csi/rpc.cpp
+  csi/service_manager.cpp
   csi/utils.cpp)
 
 set(DOCKER_SRC
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ef8825..ea8e176 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1560,6 +1560,8 @@ libcsi_la_SOURCES =                                       
                \
   csi/paths.hpp                                                                
\
   csi/rpc.cpp                                                          \
   csi/rpc.hpp                                                          \
+  csi/service_manager.cpp                                              \
+  csi/service_manager.hpp                                              \
   csi/state.hpp                                                                
\
   csi/state.proto                                                      \
   csi/utils.cpp                                                                
\
diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
new file mode 100644
index 0000000..27de0c9
--- /dev/null
+++ b/src/csi/service_manager.cpp
@@ -0,0 +1,726 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "csi/service_manager.hpp"
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <utility>
+#include <vector>
+
+#include <mesos/http.hpp>
+
+#include <mesos/agent/agent.hpp>
+
+#include <process/after.hpp>
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/loop.hpp>
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+#include <stout/os/realpath.hpp>
+
+#include "common/http.hpp"
+
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "slave/container_daemon.hpp"
+#include "slave/state.hpp"
+
+namespace http = process::http;
+namespace slave = mesos::internal::slave;
+
+using std::list;
+using std::string;
+using std::vector;
+
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+using process::ProcessBase;
+using process::Promise;
+using process::Timeout;
+
+using process::grpc::StatusError;
+
+using process::grpc::client::Runtime;
+
+using slave::ContainerDaemon;
+
+namespace mesos {
+namespace csi {
+
+// Timeout for a CSI plugin component to create its endpoint socket.
+//
+// TODO(chhsiao): Make the timeout configurable.
+constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
+
+
+// Returns the container ID of the specified CSI plugin container. The 
container
+// ID is of the following format:
+//
+//     <container_prefix><plugin_type>-<plugin_name>--<list_of_services>
+//
+// where <plugin_type> and <plugin_name> are the type and name of the CSI 
plugin
+// with dots replaced by dashes, and <list_of_services> lists the CSI services
+// provided by the component, concatenated with dashes.
+static ContainerID getContainerId(
+    const CSIPluginInfo& info,
+    const string& containerPrefix,
+    const CSIPluginContainerInfo& container)
+{
+  ContainerID containerId;
+  containerId.set_value(
+      containerPrefix +
+      strings::join("-", strings::replace(info.type(), ".", "-"), info.name()) 
+
+      "--" + strings::join("-", container.services()));
+
+  return containerId;
+}
+
+
+class ServiceManagerProcess : public Process<ServiceManagerProcess>
+{
+public:
+  ServiceManagerProcess(
+      const http::URL& _agentUrl,
+      const string& _rootDir,
+      const CSIPluginInfo& _info,
+      const hashset<Service>& services,
+      const string& _containerPrefix,
+      const Option<string>& _authToken,
+      const Runtime& _runtime,
+      Metrics* _metrics);
+
+  Future<Nothing> recover();
+
+  Future<string> getServiceEndpoint(const Service& service);
+
+private:
+  // Returns the container info of the specified container for this CSI plugin.
+  Option<CSIPluginContainerInfo> getContainerInfo(
+      const ContainerID& containerId);
+
+  // Returns a map of any existing container of this CSI plugin to its status,
+  // or `None` if it does not have a status (e.g., being destroyed).
+  Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
+
+  // Waits for the specified plugin container to be terminated.
+  Future<Nothing> waitContainer(const ContainerID& containerId);
+
+  // Kills the specified plugin container.
+  Future<Nothing> killContainer(const ContainerID& containerId);
+
+  // Waits for the endpoint (URI to a Unix domain socket) to be ready.
+  Future<Nothing> waitEndpoint(const string& endpoint);
+
+  // Returns the URI of the latest service endpoint for the specified plugin
+  // container. If the container is not already running, this method will start
+  // a new container.
+  Future<string> getEndpoint(const ContainerID& containerId);
+
+  const http::URL agentUrl;
+  const string rootDir;
+  const CSIPluginInfo info;
+  const string containerPrefix;
+  const Option<string> authToken;
+  const ContentType contentType;
+
+  Runtime runtime;
+  Metrics* metrics;
+
+  http::Headers headers;
+  hashmap<Service, ContainerID> serviceContainers;
+
+  hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
+  hashmap<ContainerID, Owned<Promise<string>>> endpoints;
+};
+
+
+ServiceManagerProcess::ServiceManagerProcess(
+    const http::URL& _agentUrl,
+    const string& _rootDir,
+    const CSIPluginInfo& _info,
+    const hashset<Service>& services,
+    const string& _containerPrefix,
+    const Option<string>& _authToken,
+    const Runtime& _runtime,
+    Metrics* _metrics)
+  : ProcessBase(process::ID::generate("csi-service-manager")),
+    agentUrl(_agentUrl),
+    rootDir(_rootDir),
+    info(_info),
+    containerPrefix(_containerPrefix),
+    authToken(_authToken),
+    contentType(ContentType::PROTOBUF),
+    runtime(_runtime),
+    metrics(_metrics)
+{
+  headers["Accept"] = stringify(contentType);
+  if (authToken.isSome()) {
+    headers["Authorization"] = "Bearer " + authToken.get();
+  }
+
+  foreach (const Service& service, services) {
+    // Each service is served by the first container providing the service. See
+    // `CSIPluginInfo` in `mesos.proto` for details.
+    foreach (const CSIPluginContainerInfo& container, info.containers()) {
+      if (container.services().end() != std::find(
+              container.services().begin(),
+              container.services().end(),
+              service)) {
+        serviceContainers[service] =
+          getContainerId(info, containerPrefix, container);
+
+        break;
+      }
+    }
+
+    CHECK(serviceContainers.contains(service))
+      << service << " not found for CSI plugin type '" << info.type()
+      << "' and name '" << info.name() << "'";
+  }
+}
+
+
+Future<Nothing> ServiceManagerProcess::recover()
+{
+  return getContainers()
+    .then(process::defer(self(), [=](
+        const hashmap<ContainerID, Option<ContainerStatus>>& containers)
+        -> Future<Nothing> {
+      Try<list<string>> containerPaths =
+        paths::getContainerPaths(rootDir, info.type(), info.name());
+
+      if (containerPaths.isError()) {
+        return Failure(
+            "Failed to find service container paths for CSI plugin type '" +
+            info.type() + "' and name '" + info.name() +
+            "': " + containerPaths.error());
+      }
+
+      vector<Future<Nothing>> futures;
+
+      foreach (const string& path, containerPaths.get()) {
+        Try<paths::ContainerPath> containerPath =
+          paths::parseContainerPath(rootDir, path);
+
+        if (containerPath.isError()) {
+          return Failure(
+              "Failed to parse service container path '" + path +
+              "': " + containerPath.error());
+        }
+
+        CHECK_EQ(info.type(), containerPath->type);
+        CHECK_EQ(info.name(), containerPath->name);
+
+        const ContainerID& containerId = containerPath->containerId;
+
+        // NOTE: Since `GET_CONTAINERS` might return containers that are being
+        // destroyed, to identify if the container is actually running, we 
check
+        // if the `executor_pid` field is set as a workaround.
+        bool isRunningContainer =
+          containers.contains(containerId) &&
+          containers.at(containerId).isSome() &&
+          containers.at(containerId)->has_executor_pid();
+
+        // Do not kill the up-to-date running controller or node container.
+        if (serviceContainers.contains_value(containerId) &&
+            isRunningContainer) {
+          const string configPath = paths::getContainerInfoPath(
+              rootDir, info.type(), info.name(), containerId);
+
+          if (os::exists(configPath)) {
+            Result<CSIPluginContainerInfo> config =
+              slave::state::read<CSIPluginContainerInfo>(configPath);
+
+            if (config.isError()) {
+              return Failure(
+                  "Failed to read service container config from '" +
+                  configPath + "': " + config.error());
+            }
+
+            if (config.isSome() &&
+                getContainerInfo(containerId) == config.get()) {
+              continue;
+            }
+          }
+        }
+
+        LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
+
+        // Otherwise, kill the container if it is running. We always wait for
+        // the container to be destroyed before performing the cleanup even if
+        // it is not killed here.
+        Future<Nothing> cleanup = Nothing();
+        if (containers.contains(containerId)) {
+          if (isRunningContainer) {
+            cleanup = killContainer(containerId);
+          }
+          cleanup = cleanup
+            .then(process::defer(self(), &Self::waitContainer, containerId));
+        }
+
+        cleanup = cleanup
+          .then(process::defer(self(), [=]() -> Future<Nothing> {
+            Result<string> endpointDir =
+              os::realpath(paths::getEndpointDirSymlinkPath(
+                  rootDir, info.type(), info.name(), containerId));
+
+            if (endpointDir.isSome()) {
+              Try<Nothing> rmdir = os::rmdir(endpointDir.get());
+              if (rmdir.isError()) {
+                return Failure(
+                    "Failed to remove endpoint directory '" +
+                    endpointDir.get() + "': " + rmdir.error());
+              }
+            }
+
+            Try<Nothing> rmdir = os::rmdir(path);
+            if (rmdir.isError()) {
+              return Failure(
+                  "Failed to remove plugin container directory '" + path +
+                  "': " + rmdir.error());
+            }
+
+            return Nothing();
+          }));
+
+        futures.push_back(cleanup);
+      }
+
+      return process::collect(futures).then([] { return Nothing(); });
+    }));
+}
+
+
+Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& 
service)
+{
+  if (!serviceContainers.contains(service)) {
+    return Failure(
+        stringify(service) + " not found for CSI plugin type '" + info.type() +
+        "' and name '" + info.name() + "'");
+  }
+
+  return getEndpoint(serviceContainers.at(service));
+}
+
+
+Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo(
+    const ContainerID& containerId)
+{
+  foreach (const auto& container, info.containers()) {
+    if (getContainerId(info, containerPrefix, container) == containerId) {
+      return container;
+    }
+  }
+
+  return None();
+}
+
+
+Future<hashmap<ContainerID, Option<ContainerStatus>>>
+ServiceManagerProcess::getContainers()
+{
+  agent::Call call;
+  call.set_type(agent::Call::GET_CONTAINERS);
+  call.mutable_get_containers()->set_show_nested(false);
+  call.mutable_get_containers()->set_show_standalone(true);
+
+  return http::post(
+      agentUrl,
+      headers,
+      internal::serialize(contentType, internal::evolve(call)),
+      stringify(contentType))
+    .then(process::defer(self(), [this](const http::Response& httpResponse)
+        -> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
+      if (httpResponse.status != http::OK().status) {
+        return Failure(
+            "Failed to get containers: Unexpected response '" +
+            httpResponse.status + "' (" + httpResponse.body + ")");
+      }
+
+      Try<v1::agent::Response> v1Response =
+        internal::deserialize<v1::agent::Response>(
+            contentType, httpResponse.body);
+
+      if (v1Response.isError()) {
+        return Failure("Failed to get containers: " + v1Response.error());
+      }
+
+      hashmap<ContainerID, Option<ContainerStatus>> result;
+
+      agent::Response response = internal::devolve(v1Response.get());
+      foreach (const agent::Response::GetContainers::Container& container,
+               response.get_containers().containers()) {
+        // Container IDs of this CSI plugin must contain the given prefix. See
+        // `LocalResourceProvider::principal` for details.
+        if (!strings::startsWith(
+                container.container_id().value(), containerPrefix)) {
+          continue;
+        }
+
+        result.put(
+            container.container_id(),
+            container.has_container_status() ? container.container_status()
+                                             : 
Option<ContainerStatus>::none());
+      }
+
+      return std::move(result);
+    }));
+}
+
+
+Future<Nothing> ServiceManagerProcess::waitContainer(
+    const ContainerID& containerId)
+{
+  agent::Call call;
+  call.set_type(agent::Call::WAIT_CONTAINER);
+  call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
+
+  return http::post(
+      agentUrl,
+      headers,
+      internal::serialize(contentType, internal::evolve(call)),
+      stringify(contentType))
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
+        return Failure(
+            "Failed to wait for container '" + stringify(containerId) +
+            "': Unexpected response '" + response.status + "' (" + 
response.body
+            + ")");
+      }
+
+      return Nothing();
+    });
+}
+
+
+Future<Nothing> ServiceManagerProcess::killContainer(
+    const ContainerID& containerId)
+{
+  agent::Call call;
+  call.set_type(agent::Call::KILL_CONTAINER);
+  call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
+
+  return http::post(
+      agentUrl,
+      headers,
+      internal::serialize(contentType, internal::evolve(call)),
+      stringify(contentType))
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
+        return Failure(
+            "Failed to kill container '" + stringify(containerId) +
+            "': Unexpected response '" + response.status + "' (" + 
response.body
+            + ")");
+      }
+
+      return Nothing();
+    });
+}
+
+
+Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint)
+{
+  CHECK(strings::startsWith(endpoint, "unix://"));
+  const string endpointPath =
+    strings::remove(endpoint, "unix://", strings::PREFIX);
+
+  Future<Nothing> created = Nothing();
+  if (!os::exists(endpointPath)) {
+    // Wait for the endpoint socket to appear until the timeout expires.
+    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+
+    created = process::loop(
+        [=]() -> Future<Nothing> {
+          if (timeout.expired()) {
+            return Failure("Timed out waiting for endpoint '" + endpoint + 
"'");
+          }
+
+          return process::after(Milliseconds(10));
+        },
+        [=](const Nothing&) -> ControlFlow<Nothing> {
+          if (os::exists(endpointPath)) {
+            return Break();
+          }
+
+          return Continue();
+        });
+  }
+
+  return created
+    .then(process::defer(self(), [=]() -> Future<Nothing> {
+      // TODO(chhsiao): Detect which CSI version to use through versioned
+      // `Probe` calls to support CSI v1 in a backward compatible way.
+      ++metrics->csi_plugin_rpcs_pending.at(v0::PROBE);
+
+      return v0::Client(endpoint, runtime)
+        .call<v0::PROBE>(v0::ProbeRequest())
+        .then(process::defer(self(), [=](
+            const Try<v0::ProbeResponse, StatusError>& result)
+            -> Future<Nothing> {
+          if (result.isError()) {
+            return Failure(
+                "Failed to probe endpoint '" + endpoint +
+                "': " + stringify(result.error()));
+          }
+
+          return Nothing();
+        }))
+        .onAny(process::defer(self(), [this](const Future<Nothing>& future) {
+          --metrics->csi_plugin_rpcs_pending.at(v0::PROBE);
+          if (future.isReady()) {
+            ++metrics->csi_plugin_rpcs_successes.at(v0::PROBE);
+          } else if (future.isDiscarded()) {
+            ++metrics->csi_plugin_rpcs_cancelled.at(v0::PROBE);
+          } else {
+            ++metrics->csi_plugin_rpcs_errors.at(v0::PROBE);
+          }
+        }));
+    }));
+}
+
+
+Future<string> ServiceManagerProcess::getEndpoint(
+    const ContainerID& containerId)
+{
+  if (endpoints.contains(containerId)) {
+    return endpoints.at(containerId)->future();
+  }
+
+  CHECK(!daemons.contains(containerId));
+
+  Option<CSIPluginContainerInfo> config = getContainerInfo(containerId);
+  CHECK_SOME(config);
+
+  // We checkpoint the config first to keep track of the plugin container even
+  // if we fail to create its container daemon.
+  const string configPath =
+    paths::getContainerInfoPath(rootDir, info.type(), info.name(), 
containerId);
+
+  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
+  if (checkpoint.isError()) {
+    return Failure(
+        "Failed to checkpoint plugin container config to '" + configPath +
+        "': " + checkpoint.error());
+  }
+
+  CommandInfo commandInfo;
+
+  if (config->has_command()) {
+    commandInfo = config->command();
+  }
+
+  // Set the `CSI_ENDPOINT` environment variable.
+  Try<string> endpointPath = paths::getEndpointSocketPath(
+      rootDir, info.type(), info.name(), containerId);
+
+  if (endpointPath.isError()) {
+    return Failure(
+        "Failed to resolve endpoint path for plugin container '" +
+        stringify(containerId) + "': " + endpointPath.error());
+  }
+
+  const string endpoint = "unix://" + endpointPath.get();
+  Environment::Variable* endpoint_ =
+    commandInfo.mutable_environment()->add_variables();
+  endpoint_->set_name("CSI_ENDPOINT");
+  endpoint_->set_value(endpoint);
+
+  ContainerInfo containerInfo;
+
+  if (config->has_container()) {
+    containerInfo = config->container();
+  } else {
+    containerInfo.set_type(ContainerInfo::MESOS);
+  }
+
+  // Prepare a volume where the endpoint socket will be placed.
+  const string endpointDir = Path(endpointPath.get()).dirname();
+  Volume* endpointVolume = containerInfo.add_volumes();
+  endpointVolume->set_mode(Volume::RW);
+  endpointVolume->set_container_path(endpointDir);
+  endpointVolume->set_host_path(endpointDir);
+
+  // Prepare the directory where the mount points will be placed.
+  const string mountRootDir =
+    paths::getMountRootDir(rootDir, info.type(), info.name());
+
+  Try<Nothing> mkdir = os::mkdir(mountRootDir);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" + mountRootDir + "': " + mkdir.error());
+  }
+
+  // Prepare a volume where the mount points will be placed.
+  Volume* mountVolume = containerInfo.add_volumes();
+  mountVolume->set_mode(Volume::RW);
+  mountVolume->set_container_path(mountRootDir);
+  mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
+  mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
+  mountVolume->mutable_source()->mutable_host_path()
+    ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
+
+  CHECK(!endpoints.contains(containerId));
+  endpoints[containerId].reset(new Promise<string>());
+
+  Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
+      agentUrl,
+      authToken,
+      containerId,
+      std::move(commandInfo),
+      config->resources(),
+      std::move(containerInfo),
+      std::function<Future<Nothing>()>(
+          process::defer(self(), [=]() -> Future<Nothing> {
+            LOG(INFO)
+              << "Connecting to endpoint '" << endpoint
+              << "' of CSI plugin container " << containerId;
+
+            CHECK(endpoints.at(containerId)->associate(
+                waitEndpoint(endpoint)
+                  .then([endpoint]() -> string { return endpoint; })));
+
+            return endpoints.at(containerId)->future().then([] {
+              return Nothing();
+            });
+          })),
+      std::function<Future<Nothing>()>(
+          process::defer(self(), [=]() -> Future<Nothing> {
+            ++metrics->csi_plugin_container_terminations;
+
+            endpoints.at(containerId)->discard();
+            endpoints.at(containerId).reset(new Promise<string>());
+
+            LOG(INFO)
+              << "Disconnected from endpoint '" << endpoint
+              << "' of CSI plugin container " << containerId;
+
+            const string endpointPath =
+              strings::remove(endpoint, "unix://", strings::PREFIX);
+
+            if (os::exists(endpointPath)) {
+              Try<Nothing> rm = os::rm(endpointPath);
+              if (rm.isError()) {
+                return Failure(
+                    "Failed to remove endpoint socket '" + endpointPath +
+                    "': " + rm.error());
+              }
+            }
+
+            return Nothing();
+          })));
+
+  if (daemon.isError()) {
+    return Failure(
+        "Failed to create container daemon for plugin container '" +
+        stringify(containerId) + "': " + daemon.error());
+  }
+
+  daemon.get()->wait()
+    .recover(process::defer(self(), [this, containerId](
+        const Future<Nothing>& future) {
+      LOG(ERROR)
+        << "Container daemon for '" << containerId << "' failed: "
+        << (future.isFailed() ? future.failure() : "future discarded");
+
+      // Fail or discard the corresponding endpoint promise if is has not been
+      // associated by the post-start hook above yet.
+      endpoints.at(containerId)->associate(
+          future.then([]() -> string { UNREACHABLE(); }));
+
+      return future;
+    }));
+
+  daemons.put(containerId, std::move(daemon.get()));
+
+  return endpoints.at(containerId)->future();
+}
+
+
+ServiceManager::ServiceManager(
+    const http::URL& agentUrl,
+    const string& rootDir,
+    const CSIPluginInfo& info,
+    const hashset<Service>& services,
+    const string& containerPrefix,
+    const Option<string>& authToken,
+    const Runtime& runtime,
+    Metrics* metrics)
+  : process(new ServiceManagerProcess(
+        agentUrl,
+        rootDir,
+        info,
+        services,
+        containerPrefix,
+        authToken,
+        runtime,
+        metrics))
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+  recovered = process::dispatch(process.get(), 
&ServiceManagerProcess::recover);
+}
+
+
+ServiceManager::~ServiceManager()
+{
+  recovered.discard();
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> ServiceManager::recover()
+{
+  return recovered;
+}
+
+
+Future<string> ServiceManager::getServiceEndpoint(const Service& service)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &ServiceManagerProcess::getServiceEndpoint, service));
+}
+
+} // namespace csi {
+} // namespace mesos {
diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp
new file mode 100644
index 0000000..5dd6bc7
--- /dev/null
+++ b/src/csi/service_manager.hpp
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CSI_SERVICE_MANAGER_HPP__
+#define __CSI_SERVICE_MANAGER_HPP__
+
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/hashset.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "csi/metrics.hpp"
+
+namespace mesos {
+namespace csi {
+
+using Service = CSIPluginContainerInfo::Service;
+
+constexpr Service CONTROLLER_SERVICE =
+  CSIPluginContainerInfo::CONTROLLER_SERVICE;
+
+constexpr Service NODE_SERVICE = CSIPluginContainerInfo::NODE_SERVICE;
+
+
+// Forward declarations.
+class ServiceManagerProcess;
+
+
+// Manages the service containers of a CSI plugin instance.
+class ServiceManager
+{
+public:
+  ServiceManager(
+      const process::http::URL& agentUrl,
+      const std::string& rootDir,
+      const CSIPluginInfo& info,
+      const hashset<Service>& services,
+      const std::string& containerPrefix,
+      const Option<std::string>& authToken,
+      const process::grpc::client::Runtime& runtime,
+      Metrics* metrics);
+
+  // Since this class contains `Owned` members which should not but can be
+  // copied, explicitly make this class non-copyable.
+  //
+  // TODO(chhsiao): Remove this once MESOS-5122 is fixed.
+  ServiceManager(const ServiceManager&) = delete;
+  ServiceManager& operator=(const ServiceManager&) = delete;
+
+  ~ServiceManager();
+
+  process::Future<Nothing> recover();
+
+  process::Future<std::string> getServiceEndpoint(const Service& service);
+
+private:
+  process::Owned<ServiceManagerProcess> process;
+  process::Future<Nothing> recovered;
+};
+
+} // namespace csi {
+} // namespace mesos {
+
+#endif // __CSI_SERVICE_MANAGER_HPP__
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index f307f8e..7c14ff1 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -41,7 +41,6 @@
 #include <process/loop.hpp>
 #include <process/process.hpp>
 #include <process/sequence.hpp>
-#include <process/timeout.hpp>
 
 #include <process/metrics/counter.hpp>
 #include <process/metrics/metrics.hpp>
@@ -72,7 +71,6 @@
 
 #include <stout/os/realpath.hpp>
 
-#include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
@@ -80,6 +78,7 @@
 #include "csi/metrics.hpp"
 #include "csi/paths.hpp"
 #include "csi/rpc.hpp"
+#include "csi/service_manager.hpp"
 #include "csi/state.hpp"
 #include "csi/utils.hpp"
 
@@ -91,7 +90,6 @@
 
 #include "resource_provider/storage/provider_process.hpp"
 
-#include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
@@ -124,7 +122,6 @@ using process::ProcessBase;
 using process::Promise;
 using process::Sequence;
 using process::spawn;
-using process::Timeout;
 
 using process::grpc::StatusError;
 
@@ -133,13 +130,13 @@ using process::http::authentication::Principal;
 using process::metrics::Counter;
 using process::metrics::PushGauge;
 
+using mesos::csi::ServiceManager;
+
 using mesos::csi::state::VolumeState;
 
 using mesos::internal::protobuf::convertLabelsToStringMap;
 using mesos::internal::protobuf::convertStringMapToLabels;
 
-using mesos::internal::slave::ContainerDaemon;
-
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::ResourceProviderState;
@@ -149,12 +146,6 @@ using mesos::v1::resource_provider::Driver;
 namespace mesos {
 namespace internal {
 
-// Timeout for a CSI plugin component to create its endpoint socket.
-//
-// TODO(chhsiao): Make the timeout configurable.
-constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
-
-
 // Returns true if the string is a valid Java identifier.
 static bool isValidName(const string& s)
 {
@@ -189,52 +180,6 @@ static bool isValidType(const string& s)
 }
 
 
-// Returns the container ID of the standalone container to run a CSI plugin
-// component. The container ID is of the following format:
-//     <cid_prefix><csi_type>-<csi_name>--<list_of_services>
-// where <cid_prefix> comes from the principal of the resource provider,
-// <csi_type> and <csi_name> are the type and name of the CSI plugin, with dots
-// replaced by dashes. <list_of_services> lists the CSI services provided by 
the
-// component, concatenated with dashes.
-static inline ContainerID getContainerId(
-    const ResourceProviderInfo& info,
-    const CSIPluginContainerInfo& container)
-{
-  const Principal principal = LocalResourceProvider::principal(info);
-  CHECK(principal.claims.contains("cid_prefix"));
-
-  string value = principal.claims.at("cid_prefix") + strings::join(
-      "-",
-      strings::replace(info.storage().plugin().type(), ".", "-"),
-      info.storage().plugin().name(),
-      "");
-
-  for (int i = 0; i < container.services_size(); i++) {
-    value += "-" + stringify(container.services(i));
-  }
-
-  ContainerID containerId;
-  containerId.set_value(value);
-
-  return containerId;
-}
-
-
-static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo(
-    const ResourceProviderInfo& info,
-    const ContainerID& containerId)
-{
-  foreach (const CSIPluginContainerInfo& container,
-           info.storage().plugin().containers()) {
-    if (getContainerId(info, container) == containerId) {
-      return container;
-    }
-  }
-
-  return None();
-}
-
-
 // Returns the parent endpoint as a URL.
 // TODO(jieyu): Consider using a more reliable way to get the agent v1
 // operator API endpoint URL.
@@ -248,20 +193,6 @@ static inline http::URL extractParentEndpoint(const 
http::URL& url)
 }
 
 
-// Returns the 'Bearer' credential as a header for calling the V1 agent
-// API if the `authToken` is presented, or empty otherwise.
-static inline http::Headers getAuthHeader(const Option<string>& authToken)
-{
-  http::Headers headers;
-
-  if (authToken.isSome()) {
-    headers["Authorization"] = "Bearer " + authToken.get();
-  }
-
-  return headers;
-}
-
-
 static inline Resource createRawDiskResource(
     const ResourceProviderInfo& info,
     const Bytes& capacity,
@@ -405,17 +336,17 @@ template <
     csi::v0::RPC rpc,
     typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
 Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
-    const ContainerID& containerId,
+    const csi::Service& service,
     const csi::v0::Request<rpc>& request,
-    const bool retry)
+    const bool retry) // Make immutable in the following mutable lambda.
 {
   Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
 
   return loop(
       self(),
       [=] {
-        // Perform the call with the latest service future.
-        return getService(containerId)
+        // Make the call to the latest service endpoint.
+        return serviceManager->getServiceEndpoint(service)
           .then(defer(
               self(),
               &StorageLocalResourceProviderProcess::_call<rpc>,
@@ -443,11 +374,11 @@ Future<csi::v0::Response<rpc>> 
StorageLocalResourceProviderProcess::call(
 template <csi::v0::RPC rpc>
 Future<Try<csi::v0::Response<rpc>, StatusError>>
 StorageLocalResourceProviderProcess::_call(
-    csi::v0::Client client, const csi::v0::Request<rpc>& request)
+    const string& endpoint, const csi::v0::Request<rpc>& request)
 {
   ++metrics.csi_plugin_rpcs_pending.at(rpc);
 
-  return client.call<rpc>(request)
+  return csi::v0::Client(endpoint, runtime).call<rpc>(request)
     .onAny(defer(self(), [=](
         const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
       --metrics.csi_plugin_rpcs_pending.at(rpc);
@@ -526,29 +457,19 @@ void StorageLocalResourceProviderProcess::initialize()
 
   bootId = _bootId.get();
 
-  foreach (const CSIPluginContainerInfo& container,
-           info.storage().plugin().containers()) {
-    if (container.services().end() != find(
-            container.services().begin(),
-            container.services().end(),
-            CSIPluginContainerInfo::NODE_SERVICE)) {
-      nodeContainerId = getContainerId(info, container);
-      break;
-    }
-  }
-
-  CHECK_SOME(nodeContainerId);
+  const Principal principal = LocalResourceProvider::principal(info);
+  CHECK(principal.claims.contains("cid_prefix"));
+  const string& containerPrefix = principal.claims.at("cid_prefix");
 
-  foreach (const CSIPluginContainerInfo& container,
-           info.storage().plugin().containers()) {
-    if (container.services().end() != find(
-            container.services().begin(),
-            container.services().end(),
-            CSIPluginContainerInfo::CONTROLLER_SERVICE)) {
-      controllerContainerId = getContainerId(info, container);
-      break;
-    }
-  }
+  serviceManager.reset(new ServiceManager(
+      extractParentEndpoint(url),
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin(),
+      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+      containerPrefix,
+      authToken,
+      runtime,
+      &metrics));
 
   auto die = [=](const string& message) {
     LOG(ERROR)
@@ -579,7 +500,15 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  return recoverServices()
+  // NOTE: The `Controller` service is supported if the plugin has the
+  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if
+  // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability. So
+  // we first launch the node plugin to get the plugin capabilities, then 
decide
+  // if we need to launch the controller plugin and get the node ID.
+  return serviceManager->recover()
+    .then(defer(self(), &Self::prepareIdentityService))
+    .then(defer(self(), &Self::prepareControllerService))
+    .then(defer(self(), &Self::prepareNodeService))
     .then(defer(self(), &Self::recoverVolumes))
     .then(defer(self(), &Self::recoverResourceProviderState))
     .then(defer(self(), [=]() -> Future<Nothing> {
@@ -612,135 +541,6 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
-{
-  return getContainers()
-    .then(defer(self(), [=](
-        const hashmap<ContainerID, Option<ContainerStatus>>& runningContainers)
-        -> Future<Nothing> {
-      Try<list<string>> containerPaths = csi::paths::getContainerPaths(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name());
-
-      if (containerPaths.isError()) {
-        return Failure(
-            "Failed to find plugin containers for CSI plugin type '" +
-            info.storage().plugin().type() + "' and name '" +
-            info.storage().plugin().name() + "': " +
-            containerPaths.error());
-      }
-
-      vector<Future<Nothing>> futures;
-
-      foreach (const string& path, containerPaths.get()) {
-        Try<csi::paths::ContainerPath> containerPath =
-          csi::paths::parseContainerPath(
-              slave::paths::getCsiRootDir(workDir),
-              path);
-
-        if (containerPath.isError()) {
-          return Failure(
-              "Failed to parse container path '" + path + "': " +
-              containerPath.error());
-        }
-
-        CHECK_EQ(info.storage().plugin().type(), containerPath->type);
-        CHECK_EQ(info.storage().plugin().name(), containerPath->name);
-
-        const ContainerID& containerId = containerPath->containerId;
-
-        // NOTE: Since `getContainers` might return containers that are not
-        // actually running, to identify if the container is actually running,
-        // we check if the `executor_pid` field is set as a workaround.
-        bool isRunningContainer = runningContainers.contains(containerId) &&
-          runningContainers.at(containerId).isSome() &&
-          runningContainers.at(containerId)->has_executor_pid();
-
-        // Do not kill the up-to-date running controller or node container.
-        if ((nodeContainerId == containerId ||
-             controllerContainerId == containerId) && isRunningContainer) {
-          const string configPath = csi::paths::getContainerInfoPath(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name(),
-              containerId);
-
-          if (os::exists(configPath)) {
-            Result<CSIPluginContainerInfo> config =
-              slave::state::read<CSIPluginContainerInfo>(configPath);
-
-            if (config.isError()) {
-              return Failure(
-                  "Failed to read plugin container config from '" + configPath 
+
-                  "': " + config.error());
-            }
-
-            if (config.isSome() &&
-                getCSIPluginContainerInfo(info, containerId) == config.get()) {
-              continue;
-            }
-          }
-        }
-
-        LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
-
-        // Otherwise, kill the container only if it is actually running (i.e.,
-        // not already being destroyed), then wait for the container to be
-        // destroyed before performing the cleanup despite if we kill it.
-        Future<Nothing> cleanup = Nothing();
-        if (runningContainers.contains(containerId)) {
-          if (isRunningContainer) {
-            cleanup = killContainer(containerId);
-          }
-          cleanup = cleanup
-            .then(defer(self(), &Self::waitContainer, containerId));
-        }
-
-        cleanup = cleanup
-          .then(defer(self(), [=]() -> Future<Nothing> {
-            Result<string> endpointDir =
-              os::realpath(csi::paths::getEndpointDirSymlinkPath(
-                  slave::paths::getCsiRootDir(workDir),
-                  info.storage().plugin().type(),
-                  info.storage().plugin().name(),
-                  containerId));
-
-            if (endpointDir.isSome()) {
-              Try<Nothing> rmdir = os::rmdir(endpointDir.get());
-              if (rmdir.isError()) {
-                return Failure(
-                    "Failed to remove endpoint directory '" +
-                    endpointDir.get() + "': " + rmdir.error());
-              }
-            }
-
-            Try<Nothing> rmdir = os::rmdir(path);
-                if (rmdir.isError()) {
-              return Failure(
-                  "Failed to remove plugin container directory '" + path +
-                  "': " + rmdir.error());
-            }
-
-            return Nothing();
-          }));
-
-        futures.push_back(cleanup);
-      }
-
-      return collect(futures).then([] { return Nothing(); });
-    }))
-    // NOTE: The `Controller` service is supported if the plugin has the
-    // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported 
if
-    // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability.
-    // So we first launch the node plugin to get the plugin capabilities, then
-    // decide if we need to launch the controller plugin and get the node ID.
-    .then(defer(self(), &Self::prepareIdentityService))
-    .then(defer(self(), &Self::prepareControllerService))
-    .then(defer(self(), &Self::prepareNodeService));
-}
-
-
 Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 {
   // Recover the states of CSI volumes.
@@ -1874,311 +1674,11 @@ void 
StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
-    const string& endpoint)
-{
-  Future<csi::v0::Client> service;
-
-  if (os::exists(endpoint)) {
-    service = csi::v0::Client("unix://" + endpoint, runtime);
-  } else {
-    // Wait for the endpoint socket to appear until the timeout expires.
-    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
-
-    service = loop(
-        self(),
-        [=]() -> Future<Nothing> {
-          if (timeout.expired()) {
-            return Failure("Timed out waiting for endpoint '" + endpoint + 
"'");
-          }
-
-          return after(Milliseconds(10));
-        },
-        [=](const Nothing&) -> ControlFlow<csi::v0::Client> {
-          if (os::exists(endpoint)) {
-            return Break(csi::v0::Client("unix://" + endpoint, runtime));
-          }
-
-          return Continue();
-        });
-  }
-
-  return service
-    .then(defer(self(), [=](csi::v0::Client client) {
-      return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
-        .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result)
-            -> Future<csi::v0::Client> {
-          if (result.isError()) {
-            return Failure(result.error());
-          }
-
-          return client;
-        });
-    }));
-}
-
-
-Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
-    const ContainerID& containerId)
-{
-  if (daemons.contains(containerId)) {
-    CHECK(services.contains(containerId));
-    return services.at(containerId)->future();
-  }
-
-  Option<CSIPluginContainerInfo> config =
-    getCSIPluginContainerInfo(info, containerId);
-  CHECK_SOME(config);
-
-  // We checkpoint the config first to keep track of the plugin container even
-  // if we fail to create its container daemon.
-  const string configPath = csi::paths::getContainerInfoPath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      containerId);
-
-  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
-  if (checkpoint.isError()) {
-    return Failure(
-        "Failed to checkpoint plugin container config to '" + configPath +
-        "': " + checkpoint.error());
-  }
-
-  CommandInfo commandInfo;
-  if (config->has_command()) {
-    commandInfo.CopyFrom(config->command());
-  }
-
-  // Set the `CSI_ENDPOINT` environment variable.
-  Try<string> endpoint = csi::paths::getEndpointSocketPath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      containerId);
-
-  if (endpoint.isError()) {
-    return Failure(
-        "Failed to resolve endpoint path for plugin container '" +
-        stringify(containerId) + "': " + endpoint.error());
-  }
-
-  const string& endpointPath = endpoint.get();
-  Environment::Variable* endpointVar =
-    commandInfo.mutable_environment()->add_variables();
-  endpointVar->set_name("CSI_ENDPOINT");
-  endpointVar->set_value("unix://" + endpointPath);
-
-  ContainerInfo containerInfo;
-  if (config->has_container()) {
-    containerInfo.CopyFrom(config->container());
-  } else {
-    containerInfo.set_type(ContainerInfo::MESOS);
-  }
-
-  // Prepare a volume where the endpoint socket will be placed.
-  const string endpointDir = Path(endpointPath).dirname();
-  Volume* endpointVolume = containerInfo.add_volumes();
-  endpointVolume->set_mode(Volume::RW);
-  endpointVolume->set_container_path(endpointDir);
-  endpointVolume->set_host_path(endpointDir);
-
-  // Prepare the directory where the mount points will be placed.
-  const string mountRootDir = csi::paths::getMountRootDir(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name());
-
-  Try<Nothing> mkdir = os::mkdir(mountRootDir);
-  if (mkdir.isError()) {
-    return Failure(
-        "Failed to create directory '" + mountRootDir + "': " + mkdir.error());
-  }
-
-  // Prepare a volume where the mount points will be placed.
-  Volume* mountVolume = containerInfo.add_volumes();
-  mountVolume->set_mode(Volume::RW);
-  mountVolume->set_container_path(mountRootDir);
-  mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
-  mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
-  mountVolume->mutable_source()->mutable_host_path()
-    ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
-
-  CHECK(!services.contains(containerId));
-  services[containerId].reset(new Promise<csi::v0::Client>());
-
-  Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
-      extractParentEndpoint(url),
-      authToken,
-      containerId,
-      commandInfo,
-      config->resources(),
-      containerInfo,
-      std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        LOG(INFO)
-          << "CSI plugin container '" << containerId << "' started for plugin"
-          << " type '" << info.storage().plugin().type() << "' and "
-          << " name '" << info.storage().plugin().name() << "'";
-
-        CHECK(services.at(containerId)->associate(waitService(endpointPath)));
-        return services.at(containerId)->future()
-          .then([] { return Nothing(); });
-      })),
-      std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        ++metrics.csi_plugin_container_terminations;
-
-        services.at(containerId)->discard();
-        services.at(containerId).reset(new Promise<csi::v0::Client>());
-
-        LOG(INFO)
-          << "CSI plugin container '" << containerId << "' stopped for plugin"
-          << " type '" << info.storage().plugin().type() << "' and "
-          << " name '" << info.storage().plugin().name() << "'";
-
-        if (os::exists(endpointPath)) {
-          Try<Nothing> rm = os::rm(endpointPath);
-          if (rm.isError()) {
-            return Failure(
-                "Failed to remove endpoint '" + endpointPath + "': " +
-                rm.error());
-          }
-        }
-
-        return Nothing();
-      })));
-
-  if (daemon.isError()) {
-    return Failure(
-        "Failed to create container daemon for plugin container '" +
-        stringify(containerId) + "': " + daemon.error());
-  }
-
-  auto die = [=](const string& message) {
-    LOG(ERROR)
-      << "Container daemon for '" << containerId << "' failed: " << message;
-    fatal();
-  };
-
-  daemons[containerId] = daemon.get();
-  daemon.get()->wait()
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
-
-  return services.at(containerId)->future();
-}
-
-
-Future<hashmap<ContainerID, Option<ContainerStatus>>>
-StorageLocalResourceProviderProcess::getContainers()
-{
-  agent::Call call;
-  call.set_type(agent::Call::GET_CONTAINERS);
-  call.mutable_get_containers()->set_show_nested(false);
-  call.mutable_get_containers()->set_show_standalone(true);
-
-  return http::post(
-      extractParentEndpoint(url),
-      getAuthHeader(authToken) +
-        http::Headers{{"Accept", stringify(contentType)}},
-      serialize(contentType, evolve(call)),
-      stringify(contentType))
-    .then(defer(self(), [=](const http::Response& httpResponse)
-        -> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
-      hashmap<ContainerID, Option<ContainerStatus>> result;
-
-      if (httpResponse.status != http::OK().status) {
-        return Failure(
-            "Failed to get containers: Unexpected response '" +
-            httpResponse.status + "' (" + httpResponse.body + ")");
-      }
-
-      Try<v1::agent::Response> v1Response =
-        deserialize<v1::agent::Response>(contentType, httpResponse.body);
-      if (v1Response.isError()) {
-        return Failure("Failed to get containers: " + v1Response.error());
-      }
-
-      const Principal principal = LocalResourceProvider::principal(info);
-      CHECK(principal.claims.contains("cid_prefix"));
-
-      const string& cidPrefix = principal.claims.at("cid_prefix");
-
-      agent::Response response = devolve(v1Response.get());
-      foreach (const agent::Response::GetContainers::Container& container,
-               response.get_containers().containers()) {
-        if (strings::startsWith(container.container_id().value(), cidPrefix)) {
-          result.put(
-              container.container_id(),
-              container.has_container_status()
-                ? container.container_status()
-                : Option<ContainerStatus>::none());
-        }
-      }
-
-      return result;
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
-    const ContainerID& containerId)
-{
-  agent::Call call;
-  call.set_type(agent::Call::WAIT_CONTAINER);
-  call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
-
-  return http::post(
-      extractParentEndpoint(url),
-      getAuthHeader(authToken),
-      serialize(contentType, evolve(call)),
-      stringify(contentType))
-    .then([containerId](const http::Response& response) -> Future<Nothing> {
-      if (response.status != http::OK().status &&
-          response.status != http::NotFound().status) {
-        return Failure(
-            "Failed to wait for container '" + stringify(containerId) +
-            "': Unexpected response '" + response.status + "' (" + 
response.body
-            + ")");
-      }
-
-      return Nothing();
-    });
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
-    const ContainerID& containerId)
-{
-  agent::Call call;
-  call.set_type(agent::Call::KILL_CONTAINER);
-  call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
-
-  return http::post(
-      extractParentEndpoint(url),
-      getAuthHeader(authToken),
-      serialize(contentType, evolve(call)),
-      stringify(contentType))
-    .then([containerId](const http::Response& response) -> Future<Nothing> {
-      if (response.status != http::OK().status &&
-          response.status != http::NotFound().status) {
-        return Failure(
-            "Failed to kill container '" + stringify(containerId) +
-            "': Unexpected response '" + response.status + "' (" + 
response.body
-            + ")");
-      }
-
-      return Nothing();
-    });
-}
-
-
 Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
-  CHECK_SOME(nodeContainerId);
-
   // Get the plugin info.
   return call<csi::v0::GET_PLUGIN_INFO>(
-      nodeContainerId.get(), csi::v0::GetPluginInfoRequest())
+      csi::NODE_SERVICE, csi::v0::GetPluginInfoRequest())
     .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
       pluginInfo = response;
 
@@ -2186,7 +1686,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareIdentityService()
 
       // Get the plugin capabilities.
       return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
-          nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest());
+          csi::NODE_SERVICE, csi::v0::GetPluginCapabilitiesRequest());
     }))
     .then(defer(self(), [=](
         const csi::v0::GetPluginCapabilitiesResponse& response) {
@@ -2205,14 +1705,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareControllerService()
     return Nothing();
   }
 
-  if (controllerContainerId.isNone()) {
-    return Failure(
-        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
-  }
-
   // Get the controller plugin info and check for consistency.
   return call<csi::v0::GET_PLUGIN_INFO>(
-      controllerContainerId.get(), csi::v0::GetPluginInfoRequest())
+      csi::CONTROLLER_SERVICE, csi::v0::GetPluginInfoRequest())
     .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
       LOG(INFO) << "Controller plugin loaded: " << stringify(response);
 
@@ -2225,8 +1720,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareControllerService()
 
       // Get the controller capabilities.
       return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
-          controllerContainerId.get(),
-          csi::v0::ControllerGetCapabilitiesRequest());
+          csi::CONTROLLER_SERVICE, 
csi::v0::ControllerGetCapabilitiesRequest());
     }))
     .then(defer(self(), [=](
         const csi::v0::ControllerGetCapabilitiesResponse& response) {
@@ -2239,11 +1733,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareControllerService()
 
 Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 {
-  CHECK_SOME(nodeContainerId);
-
   // Get the node capabilities.
   return call<csi::v0::NODE_GET_CAPABILITIES>(
-      nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest())
+      csi::NODE_SERVICE, csi::v0::NodeGetCapabilitiesRequest())
     .then(defer(self(), [=](
         const csi::v0::NodeGetCapabilitiesResponse& response)
         -> Future<Nothing> {
@@ -2255,7 +1747,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareNodeService()
 
       // Get the node ID.
       return call<csi::v0::NODE_GET_ID>(
-          nodeContainerId.get(), csi::v0::NodeGetIdRequest())
+          csi::NODE_SERVICE, csi::v0::NodeGetIdRequest())
         .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
           nodeId = response.node_id();
 
@@ -2296,10 +1788,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerPublish(
   request.set_readonly(false);
   *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-  CHECK_SOME(controllerContainerId);
-
   return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
-      controllerContainerId.get(), std::move(request))
+      csi::CONTROLLER_SERVICE, std::move(request))
     .then(defer(self(), [this, volumeId](
         const csi::v0::ControllerPublishVolumeResponse& response) {
       VolumeData& volume = volumes.at(volumeId);
@@ -2345,10 +1835,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerUnpublish(
   request.set_volume_id(volumeId);
   request.set_node_id(nodeId.get());
 
-  CHECK_SOME(controllerContainerId);
-
   return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
-      controllerContainerId.get(), std::move(request))
+      csi::CONTROLLER_SERVICE, std::move(request))
     .then(defer(self(), [this, volumeId] {
       VolumeData& volume = volumes.at(volumeId);
 
@@ -2406,10 +1894,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeStage(
   *request.mutable_volume_capability() = volume.state.volume_capability();
   *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-  CHECK_SOME(nodeContainerId);
-
-  return call<csi::v0::NODE_STAGE_VOLUME>(
-      nodeContainerId.get(), std::move(request))
+  return call<csi::v0::NODE_STAGE_VOLUME>(csi::NODE_SERVICE, 
std::move(request))
     .then(defer(self(), [this, volumeId] {
       VolumeData& volume = volumes.at(volumeId);
 
@@ -2462,10 +1947,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeUnstage(
   request.set_volume_id(volumeId);
   request.set_staging_target_path(stagingPath);
 
-  CHECK_SOME(nodeContainerId);
-
   return call<csi::v0::NODE_UNSTAGE_VOLUME>(
-      nodeContainerId.get(), std::move(request))
+      csi::NODE_SERVICE, std::move(request))
     .then(defer(self(), [this, volumeId] {
       VolumeData& volume = volumes.at(volumeId);
 
@@ -2527,10 +2010,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodePublish(
     request.set_staging_target_path(stagingPath);
   }
 
-  CHECK_SOME(nodeContainerId);
-
   return call<csi::v0::NODE_PUBLISH_VOLUME>(
-      nodeContainerId.get(), std::move(request))
+      csi::NODE_SERVICE, std::move(request))
     .then(defer(self(), [this, volumeId] {
       VolumeData& volume = volumes.at(volumeId);
 
@@ -2579,10 +2060,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeUnpublish(
   request.set_volume_id(volumeId);
   request.set_target_path(targetPath);
 
-  CHECK_SOME(nodeContainerId);
-
   return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
-      nodeContainerId.get(), std::move(request))
+      csi::NODE_SERVICE, std::move(request))
     .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
       VolumeData& volume = volumes.at(volumeId);
 
@@ -2611,10 +2090,8 @@ Future<string> 
StorageLocalResourceProviderProcess::createVolume(
   *request.add_volume_capabilities() = profileInfo.capability;
   *request.mutable_parameters() = profileInfo.parameters;
 
-  CHECK_SOME(controllerContainerId);
-
   return call<csi::v0::CREATE_VOLUME>(
-      controllerContainerId.get(), std::move(request), true) // Retry.
+      csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
     .then(defer(self(), [=](
         const csi::v0::CreateVolumeResponse& response) -> string {
       const csi::v0::Volume& volume = response.volume();
@@ -2728,10 +2205,8 @@ Future<bool> 
StorageLocalResourceProviderProcess::deleteVolume(
       csi::v0::DeleteVolumeRequest request;
       request.set_volume_id(volumeId);
 
-      CHECK_SOME(controllerContainerId);
-
       return call<csi::v0::DELETE_VOLUME>(
-          controllerContainerId.get(), std::move(request), true) // Retry.
+          csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
         .then([] { return Nothing(); });
     }));
   }
@@ -2804,10 +2279,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::validateVolume(
   *request.add_volume_capabilities() = profileInfo.capability;
   *request.mutable_volume_attributes() = volumeAttributes;
 
-  CHECK_SOME(controllerContainerId);
-
   return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
-      controllerContainerId.get(), std::move(request))
+      csi::CONTROLLER_SERVICE, std::move(request))
     .then(defer(self(), [=](
         const csi::v0::ValidateVolumeCapabilitiesResponse& response)
         -> Future<Nothing> {
@@ -2840,12 +2313,10 @@ Future<Resources> 
StorageLocalResourceProviderProcess::listVolumes()
     return Resources();
   }
 
-  CHECK_SOME(controllerContainerId);
-
   // TODO(chhsiao): Set the max entries and use a loop to do
   // multiple `ListVolumes` calls.
   return call<csi::v0::LIST_VOLUMES>(
-      controllerContainerId.get(), csi::v0::ListVolumesRequest())
+      csi::CONTROLLER_SERVICE, csi::v0::ListVolumesRequest())
     .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
       Resources resources;
 
@@ -2888,8 +2359,6 @@ Future<Resources> 
StorageLocalResourceProviderProcess::getCapacities()
     return Resources();
   }
 
-  CHECK_SOME(controllerContainerId);
-
   vector<Future<Resources>> futures;
 
   foreachpair (const string& profile,
@@ -2901,7 +2370,7 @@ Future<Resources> 
StorageLocalResourceProviderProcess::getCapacities()
 
     futures.push_back(
         call<csi::v0::GET_CAPACITY>(
-            controllerContainerId.get(), std::move(request))
+            csi::CONTROLLER_SERVICE, std::move(request))
           .then(defer(self(), [=](
               const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
@@ -3788,15 +3257,14 @@ Option<Error> StorageLocalResourceProvider::validate(
     if (container.services().end() != find(
             container.services().begin(),
             container.services().end(),
-            CSIPluginContainerInfo::NODE_SERVICE)) {
+            csi::NODE_SERVICE)) {
       hasNodeService = true;
       break;
     }
   }
 
   if (!hasNodeService) {
-    return Error(
-        stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");
+    return Error(stringify(csi::NODE_SERVICE) + " not found");
   }
 
   return None();
diff --git a/src/resource_provider/storage/provider_process.hpp 
b/src/resource_provider/storage/provider_process.hpp
index 8640fbc..01f5fce 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -17,13 +17,11 @@
 #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
 #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
 
-#include <functional>
 #include <memory>
 #include <string>
 #include <type_traits>
 #include <vector>
 
-#include <mesos/http.hpp>
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
@@ -53,14 +51,12 @@
 #include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
-#include "csi/client.hpp"
 #include "csi/metrics.hpp"
 #include "csi/rpc.hpp"
+#include "csi/service_manager.hpp"
 #include "csi/state.hpp"
 #include "csi/utils.hpp"
 
-#include "slave/container_daemon.hpp"
-
 #include "status_update_manager/operation.hpp"
 
 namespace mesos {
@@ -118,13 +114,13 @@ public:
       csi::v0::RPC rpc,
       typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
   process::Future<csi::v0::Response<rpc>> call(
-      const ContainerID& containerId,
+      const csi::Service& service,
       const csi::v0::Request<rpc>& request,
-      const bool retry = false); // remains const in a mutable lambda.
+      bool retry = false);
 
   template <csi::v0::RPC rpc>
   process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
-  _call(csi::v0::Client client, const csi::v0::Request<rpc>& request);
+  _call(const std::string& endpoint, const csi::v0::Request<rpc>& request);
 
   template <csi::v0::RPC rpc>
   process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
@@ -150,7 +146,6 @@ private:
   // The recover functions are responsible to recover the state of the
   // resource provider and CSI volumes from checkpointed data.
   process::Future<Nothing> recover();
-  process::Future<Nothing> recoverServices();
   process::Future<Nothing> recoverVolumes();
   process::Future<Nothing> recoverResourceProviderState();
 
@@ -194,27 +189,6 @@ private:
   void reconcileOperations(
       const resource_provider::Event::ReconcileOperations& reconcile);
 
-  // Returns a future of a CSI client that waits for the endpoint socket to
-  // appear if necessary, then connects to the socket and check its readiness.
-  process::Future<csi::v0::Client> waitService(const std::string& endpoint);
-
-  // Returns a future of the latest CSI client for the specified plugin
-  // container. If the container is not already running, this method will start
-  // a new a new container daemon.
-  process::Future<csi::v0::Client> getService(const ContainerID& containerId);
-
-  // Lists all running plugin containers for this resource provider.
-  // NOTE: This might return containers that are not actually running, e.g., if
-  // they are being destroyed.
-  process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
-  getContainers();
-
-  // Waits for the specified plugin container to be terminated.
-  process::Future<Nothing> waitContainer(const ContainerID& containerId);
-
-  // Kills the specified plugin container.
-  process::Future<Nothing> killContainer(const ContainerID& containerId);
-
   process::Future<Nothing> prepareIdentityService();
 
   // NOTE: This can only be called after `prepareIdentityService`.
@@ -367,12 +341,8 @@ private:
   // The mapping of known profiles fetched from the DiskProfileAdaptor.
   hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
 
-  hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
-  hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
-    services;
+  process::Owned<csi::ServiceManager> serviceManager;
 
-  Option<ContainerID> nodeContainerId;
-  Option<ContainerID> controllerContainerId;
   Option<csi::v0::GetPluginInfoResponse> pluginInfo;
   csi::v0::PluginCapabilities pluginCapabilities;
   csi::v0::ControllerCapabilities controllerCapabilities;

Reply via email to