Moved SLRP resource recovery logic into a helper. This logically separates the Storage Local Resource Provider's recovery logic so that we can insert another step between recovering resources and starting the ResourceProvider Driver.
A subsequent patch will insert the intermediate recovery step(s). Review: https://reviews.apache.org/r/64615 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/454e7f2b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/454e7f2b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/454e7f2b Branch: refs/heads/master Commit: 454e7f2b7c9e87817109f51dae6356aa8f56e766 Parents: f4743f7 Author: Joseph Wu <[email protected]> Authored: Thu Dec 14 05:21:06 2017 -0800 Committer: Joseph Wu <[email protected]> Committed: Mon Dec 18 19:47:39 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 97 +++++++++++++------------ 1 file changed, 52 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/454e7f2b/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index b50de33..79d7f60 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -338,6 +338,7 @@ private: Future<Nothing> recover(); Future<Nothing> recoverServices(); Future<Nothing> recoverVolumes(); + Future<Nothing> recoverResources(); Future<Nothing> recoverStatusUpdates(); void doReliableRegistration(); Future<Nothing> reconcileResourceProviderState(); @@ -593,52 +594,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() return recoverServices() .then(defer(self(), &Self::recoverVolumes)) + .then(defer(self(), &Self::recoverResources)) .then(defer(self(), [=]() -> Future<Nothing> { - // Recover the resource provider ID and state from the latest - // symlink. If the symlink does not exist, this is a new resource - // provider, and the total resources will be empty, which is fine - // since new resources will be added during reconciliation. - Result<string> realpath = os::realpath( - slave::paths::getLatestResourceProviderPath( - metaDir, slaveId, info.type(), info.name())); - - if (realpath.isError()) { - return Failure( - "Failed to read the latest symlink for resource provider with " - "type '" + info.type() + "' and name '" + info.name() + "'" - ": " + realpath.error()); - } - - if (realpath.isSome()) { - info.mutable_id()->set_value(Path(realpath.get()).basename()); - - const string statePath = slave::paths::getResourceProviderStatePath( - metaDir, slaveId, info.type(), info.name(), info.id()); - - Result<ResourceProviderState> resourceProviderState = - ::protobuf::read<ResourceProviderState>(statePath); - - if (resourceProviderState.isError()) { - return Failure( - "Failed to read resource provider state from '" + statePath + - "': " + resourceProviderState.error()); - } - - if (resourceProviderState.isSome()) { - foreach (const OfferOperation& operation, - resourceProviderState->operations()) { - Try<id::UUID> uuid = - id::UUID::fromBytes(operation.operation_uuid().value()); - - CHECK_SOME(uuid); - - offerOperations[uuid.get()] = operation; - } - - totalResources = resourceProviderState->resources(); - } - } - state = DISCONNECTED; driver.reset(new Driver( @@ -857,6 +814,56 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() } +Future<Nothing> StorageLocalResourceProviderProcess::recoverResources() +{ + // Recover the resource provider ID and state from the latest + // symlink. If the symlink does not exist, this is a new resource + // provider, and the total resources will be empty, which is fine + // since new resources will be added during reconciliation. + Result<string> realpath = os::realpath( + slave::paths::getLatestResourceProviderPath( + metaDir, slaveId, info.type(), info.name())); + + if (realpath.isError()) { + return Failure( + "Failed to read the latest symlink for resource provider with " + "type '" + info.type() + "' and name '" + info.name() + "'" + ": " + realpath.error()); + } + + if (realpath.isSome()) { + info.mutable_id()->set_value(Path(realpath.get()).basename()); + + const string statePath = slave::paths::getResourceProviderStatePath( + metaDir, slaveId, info.type(), info.name(), info.id()); + + Result<ResourceProviderState> resourceProviderState = + ::protobuf::read<ResourceProviderState>(statePath); + + if (resourceProviderState.isError()) { + return Failure( + "Failed to read resource provider state from '" + statePath + + "': " + resourceProviderState.error()); + } + + if (resourceProviderState.isSome()) { + foreach (const OfferOperation& operation, + resourceProviderState->operations()) { + Try<id::UUID> uuid = + id::UUID::fromBytes(operation.operation_uuid().value()); + + CHECK_SOME(uuid); + + offerOperations[uuid.get()] = operation; + } + + totalResources = resourceProviderState->resources(); + } + } + + return Nothing(); +} + Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates() { CHECK(info.has_id());
