This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7e5a0e2d04722ad3a261dcd43163ebfaae73c2a3 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Apr 1 23:23:45 2019 -0700 Added skeleton code for v0 `VolumeManager`. Review: https://reviews.apache.org/r/70214/ --- src/CMakeLists.txt | 1 + src/Makefile.am | 3 + src/csi/v0_volume_manager.cpp | 276 ++++++++++++++++++++++++++++++++++ src/csi/v0_volume_manager.hpp | 116 ++++++++++++++ src/csi/v0_volume_manager_process.hpp | 107 +++++++++++++ src/csi/volume_manager.cpp | 17 ++- 6 files changed, 518 insertions(+), 2 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4a7518f..20e50a4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -248,6 +248,7 @@ set(CSI_SRC csi/rpc.cpp csi/service_manager.cpp csi/utils.cpp + csi/v0_volume_manager.cpp csi/volume_manager.cpp) set(DOCKER_SRC diff --git a/src/Makefile.am b/src/Makefile.am index d91d1a1..89a4090 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1579,6 +1579,9 @@ libcsi_la_SOURCES = \ csi/state.proto \ csi/utils.cpp \ csi/utils.hpp \ + csi/v0_volume_manager.cpp \ + csi/v0_volume_manager.hpp \ + csi/v0_volume_manager_process.hpp \ csi/volume_manager.cpp \ csi/volume_manager.hpp \ ../include/csi/spec.hpp diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp new file mode 100644 index 0000000..2a4d3eb --- /dev/null +++ b/src/csi/v0_volume_manager.cpp @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "csi/v0_volume_manager.hpp" + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/id.hpp> +#include <process/process.hpp> + +#include <stout/check.hpp> + +#include "csi/v0_volume_manager_process.hpp" + +namespace http = process::http; + +using std::string; +using std::vector; + +using google::protobuf::Map; + +using process::Failure; +using process::Future; +using process::ProcessBase; + +using process::grpc::client::Runtime; + +namespace mesos{ +namespace csi { +namespace v0 { + +VolumeManagerProcess::VolumeManagerProcess( + const http::URL& agentUrl, + const string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service> _services, + const string& containerPrefix, + const Option<string>& authToken, + const Runtime& _runtime, + Metrics* _metrics) + : ProcessBase(process::ID::generate("csi-v0-volume-manager")), + rootDir(_rootDir), + info(_info), + services(_services), + runtime(_runtime), + metrics(_metrics), + serviceManager(new ServiceManager( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + runtime, + metrics)) +{ + // This should have been validated in `VolumeManager::create`. + CHECK(!services.empty()) + << "Must specify at least one service for CSI plugin type '" << info.type() + << "' and name '" << info.name() << "'"; +} + + +Future<Nothing> VolumeManagerProcess::recover() +{ + return Failure("Unimplemented"); +} + + +Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes() +{ + return Failure("Unimplemented"); +} + + +Future<Bytes> VolumeManagerProcess::getCapacity( + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return Failure("Unimplemented"); +} + + +Future<VolumeInfo> VolumeManagerProcess::createVolume( + const string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return Failure("Unimplemented"); +} + + +Future<Option<Error>> VolumeManagerProcess::validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return Failure("Unimplemented"); +} + + +Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId) +{ + return Failure("Unimplemented"); +} + + +Future<Nothing> VolumeManagerProcess::attachVolume(const string& volumeId) +{ + return Failure("Unimplemented"); +} + + +Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId) +{ + return Failure("Unimplemented"); +} + + +Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId) +{ + return Failure("Unimplemented"); +} + + +Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId) +{ + return Failure("Unimplemented"); +} + + +VolumeManager::VolumeManager( + const http::URL& agentUrl, + const string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const string& containerPrefix, + const Option<string>& authToken, + const Runtime& runtime, + Metrics* metrics) + : process(new VolumeManagerProcess( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + runtime, + metrics)) +{ + process::spawn(CHECK_NOTNULL(process.get())); + recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover); +} + + +VolumeManager::~VolumeManager() +{ + process::terminate(process.get()); + process::wait(process.get()); +} + + +Future<Nothing> VolumeManager::recover() +{ + return recovered; +} + + +Future<vector<VolumeInfo>> VolumeManager::listVolumes() +{ + return recovered + .then(process::defer(process.get(), &VolumeManagerProcess::listVolumes)); +} + + +Future<Bytes> VolumeManager::getCapacity( + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::getCapacity, + capability, + parameters)); +} + + +Future<VolumeInfo> VolumeManager::createVolume( + const string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::createVolume, + name, + capacity, + capability, + parameters)); +} + + +Future<Option<Error>> VolumeManager::validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::validateVolume, + volumeInfo, + capability, + parameters)); +} + + +Future<bool> VolumeManager::deleteVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::deleteVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::attachVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::attachVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::detachVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::detachVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::publishVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::publishVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::unpublishVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::unpublishVolume, volumeId)); +} + +} // namespace v0 { +} // namespace csi { +} // namespace mesos { diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp new file mode 100644 index 0000000..6c15f29 --- /dev/null +++ b/src/csi/v0_volume_manager.hpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_V0_VOLUME_MANAGER_HPP__ +#define __CSI_V0_VOLUME_MANAGER_HPP__ + +#include <string> +#include <vector> + +#include <google/protobuf/map.h> + +#include <mesos/mesos.hpp> + +#include <mesos/csi/types.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> + +#include <stout/bytes.hpp> +#include <stout/error.hpp> +#include <stout/hashset.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> + +#include "csi/metrics.hpp" +#include "csi/service_manager.hpp" +#include "csi/volume_manager.hpp" + +namespace mesos { +namespace csi { +namespace v0 { + +// Forward declarations. +class VolumeManagerProcess; + + +class VolumeManager : public csi::VolumeManager +{ +public: + VolumeManager( + const process::http::URL& agentUrl, + const std::string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const std::string& containerPrefix, + const Option<std::string>& authToken, + const process::grpc::client::Runtime& runtime, + Metrics* metrics); + + // Since this class contains `Owned` members which should not but can be + // copied, explicitly make this class non-copyable. + // + // TODO(chhsiao): Remove this once MESOS-5122 is fixed. + VolumeManager(const VolumeManager&) = delete; + VolumeManager& operator=(const VolumeManager&) = delete; + + ~VolumeManager() override; + + process::Future<Nothing> recover() override; + + process::Future<std::vector<VolumeInfo>> listVolumes() override; + + process::Future<Bytes> getCapacity( + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<VolumeInfo> createVolume( + const std::string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<Option<Error>> validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<bool> deleteVolume(const std::string& volumeId) override; + + process::Future<Nothing> attachVolume(const std::string& volumeId) override; + + process::Future<Nothing> detachVolume(const std::string& volumeId) override; + + process::Future<Nothing> publishVolume(const std::string& volumeId) override; + + process::Future<Nothing> unpublishVolume( + const std::string& volumeId) override; + +private: + process::Owned<VolumeManagerProcess> process; + process::Future<Nothing> recovered; +}; + +} // namespace v0 { +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_V0_VOLUME_MANAGER_HPP__ diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp new file mode 100644 index 0000000..9db99de --- /dev/null +++ b/src/csi/v0_volume_manager_process.hpp @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_VOLUME_MANAGER_PROCESS_HPP__ +#define __CSI_VOLUME_MANAGER_PROCESS_HPP__ + +#include <string> +#include <vector> + +#include <google/protobuf/map.h> + +#include <mesos/mesos.hpp> + +#include <mesos/csi/types.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> + +#include <stout/bytes.hpp> +#include <stout/error.hpp> +#include <stout/hashset.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> + +#include "csi/metrics.hpp" +#include "csi/service_manager.hpp" +#include "csi/v0_volume_manager.hpp" +#include "csi/volume_manager.hpp" + +namespace mesos { +namespace csi { +namespace v0 { + + +class VolumeManagerProcess : public process::Process<VolumeManagerProcess> +{ +public: + explicit VolumeManagerProcess( + const process::http::URL& agentUrl, + const std::string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service> _services, + const std::string& containerPrefix, + const Option<std::string>& authToken, + const process::grpc::client::Runtime& _runtime, + Metrics* _metrics); + + process::Future<Nothing> recover(); + + process::Future<std::vector<VolumeInfo>> listVolumes(); + + process::Future<Bytes> getCapacity( + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<VolumeInfo> createVolume( + const std::string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<Option<Error>> validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<bool> deleteVolume(const std::string& volumeId); + + process::Future<Nothing> attachVolume(const std::string& volumeId); + + process::Future<Nothing> detachVolume(const std::string& volumeId); + + process::Future<Nothing> publishVolume(const std::string& volumeId); + + process::Future<Nothing> unpublishVolume(const std::string& volumeId); + +private: + const std::string rootDir; + const CSIPluginInfo info; + const hashset<Service> services; + + process::grpc::client::Runtime runtime; + Metrics* metrics; + process::Owned<ServiceManager> serviceManager; +}; + +} // namespace v0 { +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_VOLUME_MANAGER_PROCESS_HPP__ diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp index e73f42e..cbe45cb 100644 --- a/src/csi/volume_manager.cpp +++ b/src/csi/volume_manager.cpp @@ -16,12 +16,18 @@ #include "csi/volume_manager.hpp" +#include <process/grpc.hpp> + +#include "csi/v0_volume_manager.hpp" + namespace http = process::http; using std::string; using process::Owned; +using process::grpc::client::Runtime; + namespace mesos { namespace csi { @@ -40,8 +46,15 @@ Try<Owned<VolumeManager>> VolumeManager::create( info.type() + "' and name '" + info.name() + "'"); } - // TODO(chhsiao): Add a v0 VolumeManager. - return Error("Unimplemented"); + return new v0::VolumeManager( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + Runtime(), + metrics); } } // namespace csi {
