This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 0fddc51b2e7aebdd2c7409d06bba4870bf1c14a4 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Mon Apr 1 23:23:53 2019 -0700 Cleanup volume and storage pool listing. This patch modified SLRP's volume and storage pool listing methods to conform to `VolumeManager`s public interface. They will be moved out from SLRP to v0 `VolumeManager` later. Review: https://reviews.apache.org/r/70284/ --- src/resource_provider/storage/provider.cpp | 156 ++++++++++++--------- src/resource_provider/storage/provider_process.hpp | 9 +- 2 files changed, 95 insertions(+), 70 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 5a63e7b..c5a5213 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -779,7 +779,7 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState() { return reconcileOperationStatuses() .then(defer(self(), [=] { - return collect(vector<Future<Resources>>{listVolumes(), getCapacities()}) + return collect<Resources>({getRawVolumes(), getStoragePools()}) .then(defer(self(), [=](const vector<Resources>& discovered) { ResourceConversion conversion = reconcileResources( totalResources, @@ -986,7 +986,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools() fatal(); }; - return getCapacities() + return getStoragePools() .then(defer(self(), [=](const Resources& discovered) { ResourceConversion conversion = reconcileResources( totalResources.filter( @@ -1109,6 +1109,71 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources( } +Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes() +{ + CHECK(info.has_id()); + + return listVolumes() + .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) { + Resources resources; + + // Recover disk profiles from the checkpointed state. + hashmap<string, string> volumesToProfiles; + foreach (const Resource& resource, totalResources) { + if (resource.disk().source().has_id() && + resource.disk().source().has_profile()) { + volumesToProfiles.put( + resource.disk().source().id(), + resource.disk().source().profile()); + } + } + + foreach (const VolumeInfo& volumeInfo, volumeInfos) { + resources += createRawDiskResource( + info, + volumeInfo.capacity, + volumesToProfiles.contains(volumeInfo.id) + ? volumesToProfiles.at(volumeInfo.id) + : Option<string>::none(), + vendor, + volumeInfo.id, + volumeInfo.context.empty() + ? Option<Labels>::none() + : convertStringMapToLabels(volumeInfo.context)); + } + + return resources; + })); +} + + +Future<Resources> StorageLocalResourceProviderProcess::getStoragePools() +{ + CHECK(info.has_id()); + + vector<Future<Resources>> futures; + + foreachpair (const string& profile, + const DiskProfileAdaptor::ProfileInfo& profileInfo, + profileInfos) { + futures.push_back( + getCapacity(profileInfo.capability, profileInfo.parameters) + .then(defer(self(), [=](const Bytes& capacity) -> Resources { + if (capacity == 0) { + return Resources(); + } + + return createRawDiskResource(info, capacity, profile, vendor); + }))); + } + + return collect(futures) + .then([](const vector<Resources>& resources) { + return accumulate(resources.begin(), resources.end(), Resources()); + }); +} + + void StorageLocalResourceProviderProcess::watchProfiles() { auto err = [](const string& message) { @@ -2180,88 +2245,43 @@ Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume( } -Future<Resources> StorageLocalResourceProviderProcess::listVolumes() +Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes() { - CHECK(info.has_id()); - - // This is only used for reconciliation so no failure is returned. if (!controllerCapabilities->listVolumes) { - return Resources(); + return vector<VolumeInfo>(); } - // TODO(chhsiao): Set the max entries and use a loop to do - // multiple `ListVolumes` calls. - return call<csi::v0::LIST_VOLUMES>( - csi::CONTROLLER_SERVICE, csi::v0::ListVolumesRequest()) - .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { - Resources resources; - - // Recover disk profiles from the checkpointed state. - hashmap<string, string> volumesToProfiles; - foreach (const Resource& resource, totalResources) { - if (resource.disk().source().has_id() && - resource.disk().source().has_profile()) { - volumesToProfiles.put( - resource.disk().source().id(), - resource.disk().source().profile()); - } - } - + // TODO(chhsiao): Set the max entries and use a loop to do multiple + // `ListVolumes` calls. + return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest()) + .then(process::defer(self(), [](const ListVolumesResponse& response) { + vector<VolumeInfo> result; foreach (const auto& entry, response.entries()) { - resources += createRawDiskResource( - info, - Bytes(entry.volume().capacity_bytes()), - volumesToProfiles.contains(entry.volume().id()) - ? volumesToProfiles.at(entry.volume().id()) - : Option<string>::none(), - vendor, - entry.volume().id(), - entry.volume().attributes().empty() - ? Option<Labels>::none() - : convertStringMapToLabels(entry.volume().attributes())); + result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()), + entry.volume().id(), + entry.volume().attributes()}); } - return resources; + return result; })); } -Future<Resources> StorageLocalResourceProviderProcess::getCapacities() +Future<Bytes> StorageLocalResourceProviderProcess::getCapacity( + const types::VolumeCapability& capability, + const Map<string, string>& parameters) { - CHECK(info.has_id()); - - // This is only used for reconciliation so no failure is returned. if (!controllerCapabilities->getCapacity) { - return Resources(); + return Bytes(0); } - vector<Future<Resources>> futures; - - foreachpair (const string& profile, - const DiskProfileAdaptor::ProfileInfo& profileInfo, - profileInfos) { - csi::v0::GetCapacityRequest request; - *request.add_volume_capabilities() = - csi::v0::evolve(profileInfo.capability); - *request.mutable_parameters() = profileInfo.parameters; - - futures.push_back( - call<csi::v0::GET_CAPACITY>( - csi::CONTROLLER_SERVICE, std::move(request)) - .then(defer(self(), [=]( - const csi::v0::GetCapacityResponse& response) -> Resources { - if (response.available_capacity() == 0) { - return Resources(); - } - - return createRawDiskResource( - info, Bytes(response.available_capacity()), profile, vendor); - }))); - } + GetCapacityRequest request; + *request.add_volume_capabilities() = csi::v0::evolve(capability); + *request.mutable_parameters() = parameters; - return collect(futures) - .then([](const vector<Resources>& resources) { - return accumulate(resources.begin(), resources.end(), Resources()); + return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request)) + .then([](const GetCapacityResponse& response) { + return Bytes(response.available_capacity()); }); } diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index b298a8e..56d3682 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -160,6 +160,9 @@ private: const Resources& checkpointed, const Resources& discovered); + process::Future<Resources> getRawVolumes(); + process::Future<Resources> getStoragePools(); + // Spawns a loop to watch for changes in the set of known profiles and update // the profile mapping and storage pools accordingly. void watchProfiles(); @@ -249,10 +252,12 @@ private: const google::protobuf::Map<std::string, std::string>& parameters); // NOTE: This can only be called after `prepareServices`. - process::Future<Resources> listVolumes(); + process::Future<std::vector<csi::VolumeInfo>> listVolumes(); // NOTE: This can only be called after `prepareServices`. - process::Future<Resources> getCapacities(); + process::Future<Bytes> getCapacity( + const csi::types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); // Applies the operation. Speculative operations will be synchronously // applied. Do nothing if the operation is already in a terminal state.