Repository: mesos
Updated Branches:
refs/heads/master 6dfb749c9 -> f031e031f
Imported and reconcile resources from CSI plugins.
The following lists the steps to reconcile resources:
1. Import resources from the CSI plugin:
a. Get preprovisioned volumes:
ListVolumes
ValidateVolumeCapabilities for each volume
b. GetCapacity for each profile to get RAW resources
2. For each resource in the checkpointed resources:
a. Strip away metadata (reservation, persistence, etc)
b. Additional resources after subtracting the stripped resources
from the imported resources are new resources.
3. Report the reconciled resources through UPDATE_STATE
Review: https://reviews.apache.org/r/63022/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f031e031
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f031e031
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f031e031
Branch: refs/heads/master
Commit: f031e031f541212e6612a34d08899f16b8a1bc58
Parents: 6dfb749
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed Dec 6 12:20:59 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Dec 6 14:03:01 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 335 ++++++++++++++++++++++--
1 file changed, 313 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f031e031/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp
b/src/resource_provider/storage/provider.cpp
index dc77368..256b214 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,6 +18,7 @@
#include <algorithm>
#include <cctype>
+#include <numeric>
#include <glog/logging.h>
@@ -39,15 +40,13 @@
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
+#include <stout/os.hpp>
#include <stout/path.hpp>
-#include <stout/os/exists.hpp>
-#include <stout/os/mkdir.hpp>
#include <stout/os/realpath.hpp>
-#include <stout/os/rm.hpp>
-#include <stout/os/rmdir.hpp>
#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
#include "csi/client.hpp"
#include "csi/paths.hpp"
@@ -65,6 +64,7 @@
namespace http = process::http;
+using std::accumulate;
using std::find;
using std::list;
using std::queue;
@@ -88,6 +88,9 @@ using process::spawn;
using process::http::authentication::Principal;
+using mesos::internal::protobuf::convertLabelsToStringMap;
+using mesos::internal::protobuf::convertStringMapToLabels;
+
using mesos::internal::slave::ContainerDaemon;
using mesos::resource_provider::Call;
@@ -226,6 +229,38 @@ static inline http::Headers getAuthHeader(const
Option<string>& authToken)
}
+static inline Resource createRawDiskResource(
+ const ResourceProviderID& resourceProviderId,
+ double capacity,
+ const Option<string>& profile,
+ const Option<string>& id = None(),
+ const Option<Labels>& metadata = None())
+{
+ Resource resource;
+ resource.set_name("disk");
+ resource.set_type(Value::SCALAR);
+ resource.mutable_scalar()->set_value(capacity);
+ resource.mutable_provider_id()->CopyFrom(resourceProviderId),
+ resource.mutable_disk()->mutable_source()
+ ->set_type(Resource::DiskInfo::Source::RAW);
+
+ if (profile.isSome()) {
+ resource.mutable_disk()->mutable_source()->set_profile(profile.get());
+ }
+
+ if (id.isSome()) {
+ resource.mutable_disk()->mutable_source()->set_id(id.get());
+ }
+
+ if (metadata.isSome()) {
+ resource.mutable_disk()->mutable_source()->mutable_metadata()
+ ->CopyFrom(metadata.get());
+ }
+
+ return resource;
+}
+
+
class StorageLocalResourceProviderProcess
: public Process<StorageLocalResourceProviderProcess>
{
@@ -257,12 +292,19 @@ public:
void received(const Event& event);
private:
+ struct ProfileData
+ {
+ csi::VolumeCapability capability;
+ google::protobuf::Map<string, string> parameters;
+ };
+
void initialize() override;
void fatal(const string& messsage, const string& failure);
Future<Nothing> recover();
Future<Nothing> recoverServices();
void doReliableRegistration();
+ Future<Nothing> reconcile();
// Functions for received events.
void subscribed(const Event::Subscribed& subscribed);
@@ -275,8 +317,10 @@ private:
Future<Nothing> prepareControllerService();
Future<Nothing> prepareNodeService();
+ Future<Resources> importResources();
void checkpointResourceProviderState();
+ void sendResourceProviderStateUpdate();
enum State
{
@@ -296,6 +340,7 @@ private:
const Option<string> authToken;
csi::Version csiVersion;
+ hashmap<string, ProfileData> profiles;
process::grpc::client::Runtime runtime;
Owned<v1::resource_provider::Driver> driver;
@@ -311,7 +356,7 @@ private:
list<Event::Operation> pendingOperations;
Resources totalResources;
- string resourceVersion;
+ Option<UUID> resourceVersion;
};
@@ -394,10 +439,21 @@ void StorageLocalResourceProviderProcess::initialize()
}
}
+ // NOTE: The name of the default profile is an empty string, which is
+ // the default value for `Resource.disk.source.profile` when unset.
+ // TODO(chhsiao): Use the volume profile module.
+ ProfileData& defaultProfile = profiles[""];
+ defaultProfile.capability.mutable_mount();
+ defaultProfile.capability.mutable_access_mode()
+ ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+
const string message =
"Failed to recover resource provider with type '" + info.type() +
"' and name '" + info.name() + "'";
+ // NOTE: Most resource provider events rely on the plugins being
+ // prepared. To avoid race conditions, we connect to the agent after
+ // preparing the plugins.
recover()
.onFailed(defer(self(), &Self::fatal, message, lambda::_1))
.onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
@@ -423,10 +479,11 @@ Future<Nothing>
StorageLocalResourceProviderProcess::recover()
return recoverServices()
.then(defer(self(), [=]() -> Future<Nothing> {
- // Recover the resource provider ID from the latest symlink. If
- // the symlink does not exist or it points to a non-exist
- // directory, treat this as a new resource provider.
- // TODO(chhsiao): State recovery.
+ // Recover the resource provider ID and state from the latest
+ // symlink. If the symlink cannot be resolved, this is a new
+ // resource provider, and `totalResources` and `resourceVersion`
+ // will be empty, which is fine since they will be set up during
+ // reconciliation.
Result<string> realpath = os::realpath(
slave::paths::getLatestResourceProviderPath(
metaDir, slaveId, info.type(), info.name()));
@@ -438,9 +495,7 @@ Future<Nothing>
StorageLocalResourceProviderProcess::recover()
": " + realpath.error());
}
- if (realpath.isNone()) {
- resourceVersion = UUID::random().toBytes();
- } else {
+ if (realpath.isSome()) {
info.mutable_id()->set_value(Path(realpath.get()).basename());
const string statePath = slave::paths::getResourceProviderStatePath(
@@ -455,16 +510,19 @@ Future<Nothing>
StorageLocalResourceProviderProcess::recover()
"': " + resourceProviderState.error());
}
- if (resourceProviderState.isNone()) {
- resourceVersion = UUID::random().toBytes();
- } else {
+ if (resourceProviderState.isSome()) {
foreach (const Event::Operation& operation,
resourceProviderState->operations()) {
pendingOperations.push_back(operation);
}
totalResources = resourceProviderState->resources();
- resourceVersion = resourceProviderState->resource_version_uuid();
+
+ Try<UUID> uuid =
+ UUID::fromBytes(resourceProviderState->resource_version_uuid());
+ CHECK_SOME(uuid);
+
+ resourceVersion = uuid.get();
}
}
@@ -578,23 +636,75 @@ void
StorageLocalResourceProviderProcess::doReliableRegistration()
CHECK_EQ(CONNECTED, state);
- const string message =
- "Failed to subscribe resource provider with type '" + info.type() +
- "' and name '" + info.name() + "'";
-
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_resource_provider_info()->CopyFrom(info);
- driver->send(evolve(call));
+ auto err = [](const ResourceProviderInfo& info, const string& message) {
+ LOG(ERROR)
+ << "Failed to subscribe resource provider with type '" << info.type()
+ << "' and name '" << info.name() << "': " << message;
+ };
+
+ driver->send(evolve(call))
+ .onFailed(std::bind(err, info, lambda::_1))
+ .onDiscarded(std::bind(err, info, "future discarded"));
// TODO(chhsiao): Consider doing an exponential backoff.
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
+Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
+{
+ return importResources()
+ .then(defer(self(), [=](Resources imported) {
+ // NOTE: We do not support decreasing the total resources for now.
+ // Any resource in the checkpointed state will be reported to the
+ // resource provider manager, even if it is missing in the
+ // imported resources from the plugin. Additional resources from
+ // the plugin will be reported with the default reservation.
+
+ Resources stripped;
+ foreach (const Resource& resource, totalResources) {
+ CHECK(resource.disk().source().has_id());
+
+ stripped += createRawDiskResource(
+ resource.provider_id(),
+ resource.scalar().value(),
+ resource.disk().source().has_profile()
+ ? resource.disk().source().profile() : Option<string>::none(),
+ resource.disk().source().has_id()
+ ? resource.disk().source().id() : Option<string>::none(),
+ resource.disk().source().has_metadata()
+ ? resource.disk().source().metadata() : Option<Labels>::none());
+ }
+
+ Resources result = totalResources;
+ foreach (Resource resource, imported - stripped) {
+ resource.mutable_reservations()->CopyFrom(info.default_reservations());
+ result += resource;
+
+ LOG(INFO) << "Adding new resource '" << resource << "'";
+ }
+
+ if (resourceVersion.isNone() || result != totalResources) {
+ totalResources = result;
+ resourceVersion = UUID::random();
+ checkpointResourceProviderState();
+ }
+
+ sendResourceProviderStateUpdate();
+
+ state = READY;
+
+ return Nothing();
+ }));
+}
+
+
void StorageLocalResourceProviderProcess::subscribed(
const Event::Subscribed& subscribed)
{
@@ -614,6 +724,15 @@ void StorageLocalResourceProviderProcess::subscribed(
info.name(),
info.id());
}
+
+ const string message =
+ "Failed to update state for resource provider " + stringify(info.id());
+
+ // Reconcile resources after obtaining the resource provider ID.
+ // TODO(chhsiao): Do the reconciliation early.
+ reconcile()
+ .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+ .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
}
@@ -1010,6 +1129,131 @@ Future<Nothing>
StorageLocalResourceProviderProcess::prepareNodeService()
}
+// Returns resources reported by the CSI plugin, which are unreserved
+// raw disk resources without any persistent volume.
+Future<Resources> StorageLocalResourceProviderProcess::importResources()
+{
+ // NOTE: This can only be called after `prepareControllerService` and
+ // the resource provider ID has been obtained.
+ CHECK_SOME(controllerCapabilities);
+ CHECK(info.has_id());
+
+ Future<Resources> preprovisioned;
+
+ if (controllerCapabilities->listVolumes) {
+ preprovisioned = getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ // TODO(chhsiao): Set the max entries and use a loop to do
+ // mutliple `ListVolumes` calls.
+ csi::ListVolumesRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+
+ return client.ListVolumes(request)
+ .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+ Resources resources;
+
+ // Recover volume profiles from the checkpointed state.
+ hashmap<string, string> volumesToProfiles;
+ foreach (const Resource& resource, totalResources) {
+ if (resource.disk().source().has_id() &&
+ resource.disk().source().has_profile()) {
+ volumesToProfiles[resource.disk().source().id()] =
+ resource.disk().source().profile();
+ }
+ }
+
+ foreach (const auto& entry, response.entries()) {
+ resources += createRawDiskResource(
+ info.id(),
+ entry.volume_info().capacity_bytes(),
+ volumesToProfiles.contains(entry.volume_info().id())
+ ? volumesToProfiles.at(entry.volume_info().id())
+ : Option<string>::none(),
+ entry.volume_info().id(),
+ entry.volume_info().attributes().empty()
+ ? Option<Labels>::none()
+ : convertStringMapToLabels(
+ entry.volume_info().attributes()));
+ }
+
+ return resources;
+ }));
+ }));
+ } else {
+ preprovisioned = Resources();
+ }
+
+ return preprovisioned
+ .then(defer(self(), [=](const Resources& preprovisioned) {
+ list<Future<Resources>> futures;
+
+ foreach (const Resource& resource, preprovisioned) {
+ futures.push_back(getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ csi::ValidateVolumeCapabilitiesRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.set_volume_id(resource.disk().source().id());
+
+ // The default profile is used if `profile` is unset.
+ request.add_volume_capabilities()->CopyFrom(
+ profiles.at(resource.disk().source().profile()).capability);
+
+ if (resource.disk().source().has_metadata()) {
+ request.mutable_volume_attributes()->swap(
+ convertLabelsToStringMap(
+ resource.disk().source().metadata()).get());
+ }
+
+ return client.ValidateVolumeCapabilities(request)
+ .then(defer(self(), [=](
+ const csi::ValidateVolumeCapabilitiesResponse& response)
+ -> Future<Resources> {
+ if (!response.supported()) {
+ return Failure(
+ "Unsupported volume capability for resource " +
+ stringify(resource) + ": " + response.message());
+ }
+
+ return resource;
+ }));
+ })));
+ }
+
+ if (controllerCapabilities->getCapacity) {
+ foreachkey (const string& profile, profiles) {
+ futures.push_back(getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ csi::GetCapacityRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.add_volume_capabilities()
+ ->CopyFrom(profiles.at(profile).capability);
+ *request.mutable_parameters() = profiles.at(profile).parameters;
+
+ return client.GetCapacity(request)
+ .then(defer(self(), [=](
+ const csi::GetCapacityResponse& response)
+ -> Future<Resources> {
+ if (response.available_capacity() == 0) {
+ return Resources();
+ }
+
+ return createRawDiskResource(
+ info.id(),
+ response.available_capacity(),
+ profile.empty() ? Option<string>::none() : profile);
+ }));
+ })));
+ }
+ }
+
+ return collect(futures)
+ .then(defer(self(), [=](const list<Resources>& resources) {
+ return accumulate(resources.begin(), resources.end(), Resources());
+ }));
+ }));
+}
+
+
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
@@ -1019,7 +1263,9 @@ void
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
}
state.mutable_resources()->CopyFrom(totalResources);
- state.set_resource_version_uuid(resourceVersion);
+
+ CHECK_SOME(resourceVersion);
+ state.set_resource_version_uuid(resourceVersion->toBytes());
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
@@ -1029,6 +1275,51 @@ void
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
}
+void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
+{
+ Call call;
+ call.set_type(Call::UPDATE_STATE);
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+ Call::UpdateState* update = call.mutable_update_state();
+
+ foreach (const Event::Operation& operation, pendingOperations) {
+ Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(operationUuid);
+
+ // TODO(chhsiao): Maintain a list of terminated but unacknowledged
+ // offer operations in memory and reconstruc that during recovery
+ // by querying status update manager.
+ update->add_operations()->CopyFrom(
+ protobuf::createOfferOperation(
+ operation.info(),
+ protobuf::createOfferOperationStatus(
+ OFFER_OPERATION_PENDING,
+ operation.info().has_id()
+ ? Option<OfferOperationID>(operation.info().id())
+ : None()),
+ operation.framework_id(),
+ slaveId,
+ operationUuid.get()));
+ }
+
+ update->mutable_resources()->CopyFrom(totalResources);
+
+ CHECK_SOME(resourceVersion);
+ update->set_resource_version_uuid(resourceVersion->toBytes());
+
+ auto err = [](const ResourceProviderID& id, const string& message) {
+ LOG(ERROR)
+ << "Failed to update state for resource provider " << id << ": "
+ << message;
+ };
+
+ driver->send(evolve(call))
+ .onFailed(std::bind(err, info.id(), lambda::_1))
+ .onDiscarded(std::bind(err, info.id(), "future discarded"));
+}
+
+
Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
const http::URL& url,
const string& workDir,