Initialized offer operation status update manager in SLRP. This patch adds an agent filesystem layout for checkpointing offer operation status updates for resource providers, and initialized a status update manager in the storage local resource provider.
Review: https://reviews.apache.org/r/64475/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3100e9aa Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3100e9aa Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3100e9aa Branch: refs/heads/master Commit: 3100e9aa0ac9b6bcc92643b145e2730fc862ea39 Parents: c728f8e Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Dec 13 16:02:33 2017 -0800 Committer: Greg Mann <[email protected]> Committed: Wed Dec 13 17:05:57 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 130 +++++++++++++++++++----- src/slave/paths.cpp | 54 ++++++++++ src/slave/paths.hpp | 23 +++++ 3 files changed, 179 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index e806f44..2fd4a3b 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -66,6 +66,8 @@ #include "slave/paths.hpp" #include "slave/state.hpp" +#include "status_update_manager/offer_operation.hpp" + namespace http = process::http; using std::accumulate; @@ -327,6 +329,7 @@ private: Future<Nothing> recover(); Future<Nothing> recoverServices(); Future<Nothing> recoverVolumes(); + Future<Nothing> recoverStatusUpdates(); void doReliableRegistration(); Future<Nothing> reconcile(); @@ -359,8 +362,8 @@ private: Future<Nothing> deleteVolume(const string& volumeId); // Applies the offer operation. Conventional operations will be - // synchoronusly applied. - Future<Nothing> applyOfferOperation(const UUID& operationUuid); + // synchronously applied. + Future<Nothing> _applyOfferOperation(const UUID& operationUuid); Future<vector<ResourceConversion>> applyCreateVolumeOrBlock( const Resource& resource, @@ -375,9 +378,15 @@ private: const Try<vector<ResourceConversion>>& conversions); void checkpointResourceProviderState(); - void sendResourceProviderStateUpdate(); void checkpointVolumeState(const string& volumeId); + void sendResourceProviderStateUpdate(); + + // NOTE: This is a callback for the status update manager and should + // not be called directly. + void sendOfferOperationStatusUpdate( + const OfferOperationStatusUpdate& statusUpdate); + enum State { RECOVERING, @@ -400,6 +409,7 @@ private: hashmap<string, ProfileData> profiles; process::grpc::client::Runtime runtime; Owned<v1::resource_provider::Driver> driver; + OfferOperationStatusUpdateManager statusUpdateManager; ContainerID controllerContainerId; ContainerID nodeContainerId; @@ -611,15 +621,15 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() // We replay all pending operations here, so that if a volume is // actually created before the last failover, it will be reflected // in the updated total resources before we do the reconciliation. - // NOTE: `applyOfferOperation` will remove the applied operation + // NOTE: `_applyOfferOperation` will remove the applied operation // from the list of pending operations, so we make a copy of keys // here. foreach (const UUID& uuid, pendingOperations.keys()) { - applyOfferOperation(uuid) + _applyOfferOperation(uuid) .onAny(defer(self(), [=](const Future<Nothing>& future) { if (!future.isReady()) { LOG(ERROR) - << "Failed to apply operation " << uuid << ": " + << "Failed to apply offer operation with UUID " << uuid << ": " << (future.isFailed() ? future.failure() : "future discarded"); } })); @@ -841,6 +851,27 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() } +Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates() +{ + CHECK(info.has_id()); + + const string resourceProviderDir = slave::paths::getResourceProviderPath( + metaDir, slaveId, info.type(), info.name(), info.id()); + + statusUpdateManager.initialize( + defer(self(), &Self::sendOfferOperationStatusUpdate, lambda::_1), + std::bind( + &slave::paths::getOfferOperationUpdatesPath, + resourceProviderDir, + lambda::_1)); + + statusUpdateManager.pause(); + + // TODO(chhsiao): Recover status updates. + return Nothing(); +} + + void StorageLocalResourceProviderProcess::doReliableRegistration() { if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) { @@ -872,7 +903,8 @@ void StorageLocalResourceProviderProcess::doReliableRegistration() Future<Nothing> StorageLocalResourceProviderProcess::reconcile() { - return importResources() + return recoverStatusUpdates() + .then(defer(self(), &Self::importResources)) .then(defer(self(), [=](Resources importedResources) { // NODE: If a resource in the checkpointed total resources is // missing in the imported resources, we will still keep it if it @@ -945,6 +977,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile() } sendResourceProviderStateUpdate(); + statusUpdateManager.resume(); state = READY; @@ -1005,7 +1038,7 @@ void StorageLocalResourceProviderProcess::applyOfferOperation( pendingOperations[uuid.get()] = operation; checkpointResourceProviderState(); - applyOfferOperation(uuid.get()) + _applyOfferOperation(uuid.get()) .onAny(defer(self(), [=](const Future<Nothing>& future) { if (!future.isReady()) { LOG(ERROR) @@ -1942,8 +1975,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( if (volumes.contains(volumeInfo.id())) { // The resource provider failed over after the last - // `CreateVolume` call, but before the operation status - // was checkpointed. + // `CreateVolume` call, but before the offer operation + // status was checkpointed. CHECK_EQ(csi::state::VolumeState::CREATED, volumes.at(volumeInfo.id()).state.state()); } else { @@ -2027,7 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( } } else { // The resource provider failed over after the last `DeleteVolume` - // call, but before the operation status was checkpointed. + // call, but before the offer operation status was checkpointed. CHECK(!os::exists(volumePath)); } @@ -2038,7 +2071,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( } -Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation( +Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation( const UUID& operationUuid) { Future<vector<ResourceConversion>> conversions; @@ -2205,7 +2238,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock( created = resource.disk().source().id(); } else { // We use the operation UUID as the name of the volume, so the same - // operation will create the same volume after recovery. + // offer operation will create the same volume after recovery. // TODO(chhsiao): Call `CreateVolume` sequentially with other create // or delete operations. // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources. @@ -2389,8 +2422,8 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions( auto err = [](const UUID& operationUuid, const string& message) { LOG(ERROR) - << "Failed to send status update for offer operation " << operationUuid - << ": " << message; + << "Failed to send status update for offer operation with UUID " + << operationUuid << ": " << message; }; driver->send(evolve(call)) @@ -2426,8 +2459,28 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() const string statePath = slave::paths::getResourceProviderStatePath( metaDir, slaveId, info.type(), info.name(), info.id()); - CHECK_SOME(slave::state::checkpoint(statePath, state)) - << "Failed to checkpoint resource provider state to '" << statePath << "'"; + Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state); + CHECK_SOME(checkpoint) + << "Failed to checkpoint resource provider state to '" << statePath << "': " + << checkpoint.error(); +} + + +void StorageLocalResourceProviderProcess::checkpointVolumeState( + const string& volumeId) +{ + const string statePath = csi::paths::getVolumeStatePath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); + + Try<Nothing> checkpoint = + slave::state::checkpoint(statePath, volumes.at(volumeId).state); + + CHECK_SOME(checkpoint) + << "Failed to checkpoint volume state to '" << statePath << "':" + << checkpoint.error(); } @@ -2443,8 +2496,8 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() const Event::ApplyOfferOperation& operation, pendingOperations) { // TODO(chhsiao): Maintain a list of terminated but unacknowledged - // offer operations in memory and reconstruc that during recovery - // by querying status update manager. + // offer operations in memory and reconstruct it during recovery + // by querying the status update manager. update->add_operations()->CopyFrom( protobuf::createOfferOperation( operation.info(), @@ -2475,17 +2528,38 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() } -void StorageLocalResourceProviderProcess::checkpointVolumeState( - const string& volumeId) +void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate( + const OfferOperationStatusUpdate& statusUpdate) { - const string statePath = csi::paths::getVolumeStatePath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - volumeId); + Call call; + call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS); + call.mutable_resource_provider_id()->CopyFrom(info.id()); - CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state)) - << "Failed to checkpoint volume state to '" << statePath << "'"; + Call::UpdateOfferOperationStatus* update = + call.mutable_update_offer_operation_status(); + update->set_operation_uuid(statusUpdate.operation_uuid()); + update->mutable_status()->CopyFrom(statusUpdate.status()); + + if (statusUpdate.has_framework_id()) { + update->mutable_framework_id()->CopyFrom(statusUpdate.framework_id()); + } + + // The latest status should have been set by the status update manager. + CHECK(statusUpdate.has_latest_status()); + update->mutable_latest_status()->CopyFrom(statusUpdate.latest_status()); + + auto err = [](const UUID& uuid, const string& message) { + LOG(ERROR) + << "Failed to send status update for offer operation with UUID " << uuid + << ": " << message; + }; + + Try<UUID> uuid = UUID::fromBytes(statusUpdate.operation_uuid()); + CHECK_SOME(uuid); + + driver->send(evolve(call)) + .onFailed(std::bind(err, uuid.get(), lambda::_1)) + .onDiscarded(std::bind(err, uuid.get(), "future discarded")); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.cpp ---------------------------------------------------------------------- diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp index b8004e7..f9f0c78 100644 --- a/src/slave/paths.cpp +++ b/src/slave/paths.cpp @@ -65,6 +65,7 @@ const char TASK_UPDATES_FILE[] = "task.updates"; const char RESOURCES_INFO_FILE[] = "resources.info"; const char RESOURCES_TARGET_FILE[] = "resources.target"; const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state"; +const char OFFER_OPERATION_UPDATES_FILE[] = "operation.updates"; const char CONTAINERS_DIR[] = "containers"; @@ -75,6 +76,7 @@ const char EXECUTORS_DIR[] = "executors"; const char EXECUTOR_RUNS_DIR[] = "runs"; const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry"; const char RESOURCE_PROVIDERS_DIR[] = "resource_providers"; +const char OFFER_OPERATIONS_DIR[] = "operations"; Try<ExecutorRunPath> parseExecutorRunPath( @@ -545,6 +547,58 @@ string getLatestResourceProviderPath( } +Try<list<string>> getOfferOperationPaths( + const string& rootDir) +{ + return fs::list(path::join(rootDir, OFFER_OPERATIONS_DIR, "*")); +} + + +string getOfferOperationPath( + const string& rootDir, + const UUID& operationUuid) +{ + return path::join(rootDir, OFFER_OPERATIONS_DIR, operationUuid.toString()); +} + + +Try<UUID> parseOfferOperationPath( + const string& rootDir, + const string& dir) +{ + // TODO(chhsiao): Consider using `<regex>`, which requires GCC 4.9+. + + // Make sure there's a separator at the end of the prefix so that we + // don't accidently slice off part of a directory. + const string prefix = path::join(rootDir, OFFER_OPERATIONS_DIR, ""); + + if (!strings::startsWith(dir, prefix)) { + return Error( + "Directory '" + dir + "' does not fall under operations directory '" + + prefix + "'"); + } + + Try<UUID> operationUuid = UUID::fromString(Path(dir).basename()); + if (operationUuid.isError()) { + return Error( + "Could not decode offer operation UUID from string '" + + Path(dir).basename() + "': " + operationUuid.error()); + } + + return operationUuid.get(); +} + + +string getOfferOperationUpdatesPath( + const string& rootDir, + const UUID& operationUuid) +{ + return path::join( + getOfferOperationPath(rootDir, operationUuid), + OFFER_OPERATION_UPDATES_FILE); +} + + string getResourcesInfoPath( const string& rootDir) { http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.hpp ---------------------------------------------------------------------- diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp index d645d87..bae68d0 100644 --- a/src/slave/paths.hpp +++ b/src/slave/paths.hpp @@ -23,6 +23,7 @@ #include <mesos/mesos.hpp> #include <stout/try.hpp> +#include <stout/uuid.hpp> namespace mesos { namespace internal { @@ -79,6 +80,9 @@ namespace paths { // | | |-- latest (symlink) // | | |-- <resource_provider_id> // | | |-- resource_provider.state +// | | |-- operations +// | | |-- <operation_uuid> +// | | |-- operation.updates // | |-- frameworks // | |-- <framework_id> // | |-- framework.info @@ -349,6 +353,25 @@ std::string getLatestResourceProviderPath( const std::string& resourceProviderName); +Try<std::list<std::string>> getOfferOperationPaths( + const std::string& rootDir); + + +std::string getOfferOperationPath( + const std::string& rootDir, + const UUID& operationUuid); + + +Try<UUID> parseOfferOperationPath( + const std::string& rootDir, + const std::string& dir); + + +std::string getOfferOperationUpdatesPath( + const std::string& rootDir, + const UUID& operationUuid); + + std::string getResourcesInfoPath( const std::string& rootDir);
