Integrated the status update manager into SLRP. This patch uses the status update manager to send status updates and recover operations that are checkpointed as completed in the status update manager but still in the pending list in SLRP. It also forwards the acknowledgements to the status update manager, and garbage collect the metadata for offer operations.
Review: https://reviews.apache.org/r/64551/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/065c74ce Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/065c74ce Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/065c74ce Branch: refs/heads/master Commit: 065c74ce47c4147d63ed96fb60d773c924d13a4b Parents: eeb09cb Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Dec 13 18:57:20 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 13 18:57:20 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/daemon.cpp | 21 ++- src/resource_provider/daemon.hpp | 3 +- src/resource_provider/local.cpp | 6 +- src/resource_provider/local.hpp | 3 +- src/resource_provider/storage/provider.cpp | 236 ++++++++++++++++++++---- src/resource_provider/storage/provider.hpp | 6 +- 6 files changed, 226 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/daemon.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp index 7c783e3..f160a87 100644 --- a/src/resource_provider/daemon.cpp +++ b/src/resource_provider/daemon.cpp @@ -77,12 +77,14 @@ public: const http::URL& _url, const string& _workDir, const Option<string>& _configDir, - SecretGenerator* _secretGenerator) + SecretGenerator* _secretGenerator, + bool _strict) : ProcessBase(process::ID::generate("local-resource-provider-daemon")), url(_url), workDir(_workDir), configDir(_configDir), - secretGenerator(_secretGenerator) {} + secretGenerator(_secretGenerator), + strict(_strict) {} LocalResourceProviderDaemonProcess( const LocalResourceProviderDaemonProcess& other) = delete; @@ -122,6 +124,7 @@ private: Future<Nothing> launch( const string& type, const string& name); + Future<Nothing> _launch( const string& type, const string& name, @@ -134,6 +137,7 @@ private: const string workDir; const Option<string> configDir; SecretGenerator* const secretGenerator; + const bool strict; Option<SlaveID> slaveId; hashmap<string, hashmap<string, ProviderData>> providers; @@ -439,7 +443,7 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch( } Try<Owned<LocalResourceProvider>> provider = LocalResourceProvider::create( - url, workDir, data.info, slaveId.get(), authToken); + url, workDir, data.info, slaveId.get(), authToken, strict); if (provider.isError()) { return Failure( @@ -506,7 +510,8 @@ Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create( url, flags.work_dir, configDir, - secretGenerator); + secretGenerator, + flags.strict); } @@ -514,12 +519,10 @@ LocalResourceProviderDaemon::LocalResourceProviderDaemon( const http::URL& url, const string& workDir, const Option<string>& configDir, - SecretGenerator* secretGenerator) + SecretGenerator* secretGenerator, + bool strict) : process(new LocalResourceProviderDaemonProcess( - url, - workDir, - configDir, - secretGenerator)) + url, workDir, configDir, secretGenerator, strict)) { spawn(CHECK_NOTNULL(process.get())); } http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/daemon.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/daemon.hpp b/src/resource_provider/daemon.hpp index 7c513a2..a6d0013 100644 --- a/src/resource_provider/daemon.hpp +++ b/src/resource_provider/daemon.hpp @@ -70,7 +70,8 @@ private: const process::http::URL& url, const std::string& workDir, const Option<std::string>& configDir, - SecretGenerator* secretGenerator); + SecretGenerator* secretGenerator, + bool strict); process::Owned<LocalResourceProviderDaemonProcess> process; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/local.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/local.cpp b/src/resource_provider/local.cpp index d1d6835..ae23c20 100644 --- a/src/resource_provider/local.cpp +++ b/src/resource_provider/local.cpp @@ -37,7 +37,8 @@ Try<Owned<LocalResourceProvider>> LocalResourceProvider::create( const string& workDir, const ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<string>& authToken) + const Option<string>& authToken, + bool strict) { // TODO(jieyu): Document the built-in local resource providers. const hashmap<string, lambda::function<decltype(create)>> creators = { @@ -47,7 +48,8 @@ Try<Owned<LocalResourceProvider>> LocalResourceProvider::create( }; if (creators.contains(info.type())) { - return creators.at(info.type())(url, workDir, info, slaveId, authToken); + return creators.at(info.type())( + url, workDir, info, slaveId, authToken, strict); } return Error("Unknown local resource provider type '" + info.type() + "'"); http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/local.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/local.hpp b/src/resource_provider/local.hpp index 46111d9..20bcc78 100644 --- a/src/resource_provider/local.hpp +++ b/src/resource_provider/local.hpp @@ -36,7 +36,8 @@ public: const std::string& workDir, const ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<std::string>& authToken); + const Option<std::string>& authToken, + bool strict); static Try<process::http::authentication::Principal> principal( const ResourceProviderInfo& info); http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 80bcea0..03a12c7 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -283,7 +283,8 @@ public: const string& _workDir, const ResourceProviderInfo& _info, const SlaveID& _slaveId, - const Option<string>& _authToken) + const Option<string>& _authToken, + bool _strict) : ProcessBase(process::ID::generate("storage-local-resource-provider")), state(RECOVERING), url(_url), @@ -293,6 +294,7 @@ public: info(_info), slaveId(_slaveId), authToken(_authToken), + strict(_strict), resourceVersion(UUID::random()) {} StorageLocalResourceProviderProcess( @@ -374,7 +376,9 @@ private: Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock( const Resource& resource); - // Synchronously update `totalResources` and the offer operation status. + // Synchronously updates `totalResources` and the offer operation + // status and then asks the status update manager to send status + // updates. Try<Nothing> updateOfferOperationStatus( const UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions); @@ -405,6 +409,7 @@ private: ResourceProviderInfo info; const SlaveID slaveId; const Option<string> authToken; + const bool strict; csi::Version csiVersion; string bootId; @@ -616,27 +621,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() } } - // We replay all pending operations here, so that if a volume is - // actually created or deleted before the last failover, it will - // be reflected in the total resources before reconciliation. - foreachpair (const UUID& uuid, - const OfferOperation& operation, - offerOperations) { - if (protobuf::isTerminalState(operation.latest_status().state())) { - continue; - } - - auto err = [](const UUID& uuid, const string& message) { - LOG(ERROR) - << "Falied to apply offer operation with UUID " << uuid << ": " - << message; - }; - - _applyOfferOperation(uuid) - .onFailed(std::bind(err, uuid, lambda::_1)) - .onDiscarded(std::bind(err, uuid, "future discarded")); - } - state = DISCONNECTED; driver.reset(new Driver( @@ -869,8 +853,116 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates() statusUpdateManager.pause(); - // TODO(chhsiao): Recover status updates. - return Nothing(); + Try<list<string>> operationPaths = slave::paths::getOfferOperationPaths( + slave::paths::getResourceProviderPath( + metaDir, slaveId, info.type(), info.name(), info.id())); + + if (operationPaths.isError()) { + return Failure( + "Failed to find offer operations for resource provider " + + stringify(info.id()) + ": " + operationPaths.error()); + } + + list<UUID> operationUuids; + foreach (const string& path, operationPaths.get()) { + Try<UUID> uuid = + slave::paths::parseOfferOperationPath(resourceProviderDir, path); + + if (uuid.isError()) { + return Failure( + "Failed to parse offer operation path '" + path + "': " + + uuid.error()); + } + + CHECK(offerOperations.contains(uuid.get())); + operationUuids.emplace_back(std::move(uuid.get())); + } + + return statusUpdateManager.recover(operationUuids, strict) + .then(defer(self(), [=]( + const OfferOperationStatusManagerState& statusUpdateManagerState) + -> Future<Nothing> { + using StreamState = + typename OfferOperationStatusManagerState::StreamState; + + // Clean up the operations that are terminated. + foreachpair (const UUID& uuid, + const Option<StreamState>& stream, + statusUpdateManagerState.streams) { + if (stream.isSome() && stream->terminated) { + offerOperations.erase(uuid); + + // Garbage collect the offer operation metadata. + const string path = slave::paths::getOfferOperationPath( + slave::paths::getResourceProviderPath( + metaDir, slaveId, info.type(), info.name(), info.id()), + uuid); + + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + return Failure( + "Failed to remove directory '" + path + "': " + rmdir.error()); + } + } + } + + // Send updates for all missing statuses. + foreachpair (const UUID& uuid, + const OfferOperation& operation, + offerOperations) { + if (operation.latest_status().state() == OFFER_OPERATION_PENDING) { + continue; + } + + const int numStatuses = + statusUpdateManagerState.streams.contains(uuid) && + statusUpdateManagerState.streams.at(uuid).isSome() + ? statusUpdateManagerState.streams.at(uuid)->updates.size() : 0; + + for (int i = numStatuses; i < operation.statuses().size(); i++) { + OfferOperationStatusUpdate update = + protobuf::createOfferOperationStatusUpdate( + uuid, + operation.statuses(i), + None(), + operation.has_framework_id() + ? operation.framework_id() : Option<FrameworkID>::none(), + slaveId); + + const string message = + "Failed to update status of offer operation with UUID " + + stringify(uuid); + + statusUpdateManager.update(std::move(update)) + .onFailed(defer(self(), &Self::fatal, message, lambda::_1)) + .onDiscarded( + defer(self(), &Self::fatal, message, "future discarded")); + } + } + + // We replay all pending operations here, so that if a volume is + // created or deleted before the last failover, the result will be + // reflected in the total resources before reconciliation. + foreachpair (const UUID& uuid, + const OfferOperation& operation, + offerOperations) { + if (protobuf::isTerminalState(operation.latest_status().state())) { + continue; + } + + auto err = [](const UUID& uuid, const string& message) { + LOG(ERROR) + << "Falied to apply offer operation with UUID " << uuid << ": " + << message; + }; + + _applyOfferOperation(uuid) + .onFailed(std::bind(err, uuid, lambda::_1)) + .onDiscarded(std::bind(err, uuid, "future discarded")); + } + + return Nothing(); + })); } @@ -1214,6 +1306,45 @@ void StorageLocalResourceProviderProcess::acknowledgeOfferOperation( const Event::AcknowledgeOfferOperation& acknowledge) { CHECK_EQ(READY, state); + + Try<UUID> operationUuid = UUID::fromBytes(acknowledge.operation_uuid()); + CHECK_SOME(operationUuid); + + Try<UUID> statusUuid = UUID::fromBytes(acknowledge.status_uuid()); + CHECK_SOME(statusUuid); + + auto err = [](const UUID& uuid, const string& message) { + LOG(ERROR) + << "Failed to acknowledge status update for offer operation with UUID " + << uuid << ": " << message; + }; + + // NOTE: It is possible that an incoming acknowledgement races with an + // outgoing retry of status update, and then a duplicated + // acknowledgement will be received. In this case, the following call + // will fail, so we just leave an error log. + statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get()) + .then(defer(self(), [=](bool continuation) -> Future<Nothing> { + if (!continuation) { + offerOperations.erase(operationUuid.get()); + + // Garbage collect the offer operation metadata. + const string path = slave::paths::getOfferOperationPath( + slave::paths::getResourceProviderPath( + metaDir, slaveId, info.type(), info.name(), info.id()), + operationUuid.get()); + + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + return Failure( + "Failed to remove directory '" + path + "': " + rmdir.error()); + } + } + + return Nothing(); + })) + .onFailed(std::bind(err, operationUuid.get(), lambda::_1)) + .onDiscarded(std::bind(err, operationUuid.get(), "future discarded")); } @@ -1221,6 +1352,36 @@ void StorageLocalResourceProviderProcess::reconcileOfferOperations( const Event::ReconcileOfferOperations& reconcile) { CHECK_EQ(READY, state); + + foreach (const string& operationUuid, reconcile.operation_uuids()) { + Try<UUID> uuid = UUID::fromBytes(operationUuid); + CHECK_SOME(uuid); + + if (offerOperations.contains(uuid.get())) { + // When the agent asks for reconciliation for a known operation, + // that means the `APPLY_OFFER_OPERATION` event races with the + // last `UPDATE_STATE` call and arrives after the call. Since the + // event is received, nothing needs to be done here. + continue; + } + + OfferOperationStatusUpdate update = + protobuf::createOfferOperationStatusUpdate( + uuid.get(), + protobuf::createOfferOperationStatus( + OFFER_OPERATION_DROPPED, None(), None(), None(), UUID::random()), + None(), + None(), + slaveId); + + const string message = + "Failed to update status of offer operation with UUID " + + stringify(uuid.get()); + + statusUpdateManager.update(std::move(update)) + .onFailed(defer(self(), &Self::fatal, message, lambda::_1)) + .onDiscarded(defer(self(), &Self::fatal, message, "future discarded")); + } } @@ -2358,7 +2519,7 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus( convertedResources += conversion.converted; conversion.consumed.unallocate(); conversion.converted.unallocate(); - _conversions.push_back(std::move(conversion)); + _conversions.emplace_back(std::move(conversion)); } Try<Resources> result = totalResources.apply(_conversions); @@ -2399,17 +2560,22 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus( checkpointResourceProviderState(); // Send out the status update for the offer operation. - // TODO(chhsiao): Use the status update manager. OfferOperationStatusUpdate update = protobuf::createOfferOperationStatusUpdate( operationUuid, operation.latest_status(), - operation.latest_status(), + None(), operation.has_framework_id() ? operation.framework_id() : Option<FrameworkID>::none(), slaveId); - sendOfferOperationStatusUpdate(update); + const string message = + "Failed to update status of offer operation with UUID " + + stringify(operationUuid); + + statusUpdateManager.update(std::move(update)) + .onFailed(defer(self(), &Self::fatal, message, lambda::_1)) + .onDiscarded(defer(self(), &Self::fatal, message, "future discarded")); return error.isNone() ? Nothing() : Try<Nothing>::error(error.get()); } @@ -2519,7 +2685,8 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create( const string& workDir, const ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<string>& authToken) + const Option<string>& authToken, + bool strict) { // Verify that the name follows Java package naming convention. // TODO(chhsiao): We should move this check to a validation function @@ -2571,8 +2738,8 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create( stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found"); } - return Owned<LocalResourceProvider>( - new StorageLocalResourceProvider(url, workDir, info, slaveId, authToken)); + return Owned<LocalResourceProvider>(new StorageLocalResourceProvider( + url, workDir, info, slaveId, authToken, strict)); } @@ -2590,9 +2757,10 @@ StorageLocalResourceProvider::StorageLocalResourceProvider( const string& workDir, const ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<string>& authToken) + const Option<string>& authToken, + bool strict) : process(new StorageLocalResourceProviderProcess( - url, workDir, info, slaveId, authToken)) + url, workDir, info, slaveId, authToken, strict)) { spawn(CHECK_NOTNULL(process.get())); } http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/storage/provider.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp index 374f837..5a371b1 100644 --- a/src/resource_provider/storage/provider.hpp +++ b/src/resource_provider/storage/provider.hpp @@ -41,7 +41,8 @@ public: const std::string& workDir, const mesos::ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<std::string>& authToken); + const Option<std::string>& authToken, + bool strict); static Try<process::http::authentication::Principal> principal( const mesos::ResourceProviderInfo& info); @@ -60,7 +61,8 @@ private: const std::string& workDir, const mesos::ResourceProviderInfo& info, const SlaveID& slaveId, - const Option<std::string>& authToken); + const Option<std::string>& authToken, + bool strict); process::Owned<StorageLocalResourceProviderProcess> process; };
