This is an automated email from the ASF dual-hosted git repository. bbannier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit deeae143ec0c2cc21137058a0f848a7081f85062 Author: Benjamin Bannier <bbann...@apache.org> AuthorDate: Wed Aug 14 09:33:43 2019 +0200 Performed periodic storage local provider reconciliations. Review: https://reviews.apache.org/r/71151/ --- src/resource_provider/storage/provider.cpp | 120 +++++++++---- .../storage_local_resource_provider_tests.cpp | 195 ++++++++++++++++++++- 2 files changed, 279 insertions(+), 36 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 2f91fe0..f180af8 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -38,6 +38,7 @@ #include <mesos/v1/resource_provider.hpp> +#include <process/after.hpp> #include <process/collect.hpp> #include <process/defer.hpp> #include <process/delay.hpp> @@ -79,6 +80,7 @@ #include "internal/devolve.hpp" #include "internal/evolve.hpp" +#include "resource_provider/constants.hpp" #include "resource_provider/detector.hpp" #include "resource_provider/state.hpp" @@ -130,6 +132,7 @@ using mesos::internal::protobuf::convertLabelsToStringMap; using mesos::internal::protobuf::convertStringMapToLabels; using mesos::resource_provider::Call; +using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL; using mesos::resource_provider::Event; using mesos::resource_provider::ResourceProviderState; @@ -274,8 +277,10 @@ private: Future<Nothing> reconcileResourceProviderState(); Future<Nothing> reconcileOperationStatuses(); - // Query the plugin for its resources and update the providers state. - Future<Nothing> reconcileResources(); + // Query the plugin for its resources and update the providers + // state. If `alwaysUpdate` is `true` an update will always be + // sent, even if no changes are detected. + Future<Nothing> reconcileResources(bool alwaysUpdate); ResourceConversion computeConversion( const Resources& checkpointed, const Resources& discovered) const; @@ -311,6 +316,13 @@ private: const Event::AcknowledgeOperationStatus& acknowledge); void reconcileOperations(const Event::ReconcileOperations& reconcile); + // Periodically poll the provider for resource changes. The poll interval is + // controlled by + // `ResourceProviderInfo.Storage.reconciliation_interval_seconds`. When this + // function is invoked it will perform the first poll after one reconciliation + // interval. + void watchResources(); + // Applies the operation. Speculative operations will be synchronously // applied. Do nothing if the operation is already in a terminal state. Future<Nothing> _applyOperation(const id::UUID& operationUuid); @@ -373,6 +385,8 @@ private: const Option<string> authToken; const bool strict; + const Duration reconciliationInterval; + shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; Owned<Driver> driver; @@ -442,6 +456,10 @@ StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess( slaveId(_slaveId), authToken(_authToken), strict(_strict), + reconciliationInterval( + _info.storage().has_reconciliation_interval_seconds() + ? Seconds(info.storage().reconciliation_interval_seconds()) + : DEFAULT_STORAGE_RECONCILIATION_INTERVAL), metrics("resource_providers/" + info.type() + "." + info.name() + "/"), resourceVersion(id::UUID::random()), sequence("storage-local-resource-provider-sequence") @@ -716,24 +734,73 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResourceProviderState() { return reconcileOperationStatuses() - .then(defer(self(), &Self::reconcileResources)); + .then(defer(self(), &Self::reconcileResources, true)) + .then(defer(self(), [this] { + statusUpdateManager.resume(); + + switch (state) { + case RECOVERING: + case DISCONNECTED: + case CONNECTED: + case SUBSCRIBED: { + LOG(INFO) << "Resource provider " << info.id() + << " is in READY state"; + + state = READY; + } + case READY: + break; + } + + return Nothing(); + })); } -Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources() +void StorageLocalResourceProviderProcess::watchResources() +{ + // A specified reconciliation interval of zero + // denotes disabled periodic reconciliations. + if (reconciliationInterval == Seconds(0)) { + return; + } + + CHECK(info.has_id()); + + loop( + self(), + std::bind(&process::after, reconciliationInterval), + [this](const Nothing&) { + // Poll resource provider state in `sequence` to + // prevent concurrent non-reconcilable operations. + reconciled = sequence.add(std::function<Future<Nothing>()>( + defer(self(), &Self::reconcileResources, false))); + + return reconciled.then( + [](const Nothing&) -> ControlFlow<Nothing> { return Continue(); }); + }); +} + + +Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources( + bool alwaysUpdate) { LOG(INFO) << "Reconciling storage pools and volumes"; + CHECK_PENDING(reconciled); + return collect<vector<ResourceConversion>>( {getExistingVolumes(), getStoragePools()}) .then(defer( - self(), [this](const vector<vector<ResourceConversion>>& collected) { + self(), + [alwaysUpdate, this] + (const vector<vector<ResourceConversion>>& collected) { Resources result = totalResources; foreach (const vector<ResourceConversion>& conversions, collected) { result = CHECK_NOTERROR(result.apply(conversions)); } - bool shouldSendUpdate = false; + bool shouldSendUpdate = alwaysUpdate; if (result != totalResources) { LOG(INFO) << "Removing '" << (totalResources - result) @@ -742,33 +809,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources() // Update the resource version since the total resources changed. totalResources = result; - resourceVersion = id::UUID::random(); checkpointResourceProviderState(); shouldSendUpdate = true; } - switch (state) { - case RECOVERING: - case DISCONNECTED: - case CONNECTED: - case SUBSCRIBED: { - LOG(INFO) << "Resource provider " << info.id() - << " is in READY state"; - - state = READY; - - // This is the first resource update of the current subscription. - shouldSendUpdate = true; - } - case READY: - break; - } - if (shouldSendUpdate) { sendResourceProviderStateUpdate(); - statusUpdateManager.resume(); } return Nothing(); @@ -1168,7 +1216,7 @@ void StorageLocalResourceProviderProcess::watchProfiles() std::function<Future<Nothing>()> update = defer(self(), [=] { return updateProfiles(profiles) - .then(defer(self(), &Self::reconcileResources)); + .then(defer(self(), &Self::reconcileResources, false)); }); // Update the profile mapping and storage pools in `sequence` to wait @@ -1261,10 +1309,12 @@ void StorageLocalResourceProviderProcess::subscribed( // Reconcile resources after obtaining the resource provider ID and start // watching for profile changes after the reconciliation. // TODO(chhsiao): Reconcile and watch for profile changes early. - reconciled = reconcileResourceProviderState() - .onReady(defer(self(), &Self::watchProfiles)) - .onFailed(defer(self(), std::bind(die, lambda::_1))) - .onDiscarded(defer(self(), std::bind(die, "future discarded"))); + reconciled = + reconcileResourceProviderState() + .onReady(defer(self(), &Self::watchProfiles)) + .onReady(defer(self(), &Self::watchResources)) + .onFailed(defer(self(), std::bind(die, lambda::_1))) + .onDiscarded(defer(self(), std::bind(die, "future discarded"))); } @@ -1728,7 +1778,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk( // TODO(chhsiao): Consider calling `createVolume` sequentially with other // create or delete operations, and send an `UPDATE_STATE` for storage pools - // afterward. See MESOS-9254. + // afterward. Future<VolumeInfo> created; if (resource.disk().source().has_profile()) { created = volumeManager->createVolume( @@ -1866,7 +1916,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( reconciled = sequence .add(std::function<Future<Nothing>()>( - defer(self(), &Self::reconcileResources))) + defer(self(), &Self::reconcileResources, false))) .onFailed(std::bind(err, resource, lambda::_1)) .onDiscard(std::bind(err, resource, "future discarded")); } @@ -2123,6 +2173,12 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() { + // Set a new resource version here since we typically send state + // updates when resources change. While this ensures we always have + // a new resource version whenever we set new state, with that this + // function is not idempotent anymore. + resourceVersion = id::UUID::random(); + Call call; call.set_type(Call::UPDATE_STATE); call.mutable_resource_provider_id()->CopyFrom(info.id()); diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 7624ea1..05daf2a 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -96,8 +96,6 @@ using mesos::internal::slave::ContainerDaemonProcess; using mesos::master::detector::MasterDetector; using mesos::master::detector::StandaloneMasterDetector; -using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL; - using process::Clock; using process::Future; using process::Owned; @@ -1643,8 +1641,6 @@ TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared) // The resource provider will reconcile the storage pools to reclaim the // space freed by destroying a MOUNT disk of a disappeared profile, which // would in turn trigger another agent update and thus another allocation. - // - // TODO(chhsiao): This might change once MESOS-9254 is done. AWAIT_READY(offers); ASSERT_EQ(1, offers->offers_size()); @@ -6708,6 +6704,197 @@ TEST_P( } } + +// This test validates that the SLRP periodically +// reconciles resources with the CSI plugin. +TEST_P(StorageLocalResourceProviderTest, Update) +{ + Clock::pause(); + + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + const string mockCsiEndpoint = + "unix://" + path::join(sandbox.get(), "mock_csi.sock"); + + MockCSIPlugin plugin; + ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + + constexpr Duration reconciliationInterval = Seconds(15); + + setupResourceProviderConfig( + Bytes(0), + None(), + mockCsiEndpoint, + None(), + None(), + reconciliationInterval); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + process::Queue<Nothing> getCapacityCalls; + process::Queue<Nothing> listVolumesCalls; + if (GetParam() == csi::v0::API_VERSION) { + EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>())) + .WillOnce(Invoke([&]( + grpc::ServerContext* context, + const csi::v0::ListVolumesRequest* request, + csi::v0::ListVolumesResponse* response) { + listVolumesCalls.put({}); + return grpc::Status::OK; + })) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v0::ListVolumesRequest* request, + csi::v0::ListVolumesResponse* response) { + csi::v0::Volume* volume = response->add_entries()->mutable_volume(); + volume->set_capacity_bytes(Bytes(1024).bytes()); + volume->set_id("volume1"); + + listVolumesCalls.put({}); + return grpc::Status::OK; + })); + + EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>())) + .WillOnce(Invoke([&]( + grpc::ServerContext* context, + const csi::v0::GetCapacityRequest* request, + csi::v0::GetCapacityResponse* response) { + getCapacityCalls.put({}); + return grpc::Status::OK; + })) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v0::GetCapacityRequest* request, + csi::v0::GetCapacityResponse* response) { + response->set_available_capacity(Bytes(1024).bytes()); + + getCapacityCalls.put({}); + return grpc::Status::OK; + })); + } else if (GetParam() == csi::v1::API_VERSION) { + EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>())) + .WillOnce(Invoke([&]( + grpc::ServerContext* context, + const csi::v1::ListVolumesRequest* request, + csi::v1::ListVolumesResponse* response) { + listVolumesCalls.put({}); + return grpc::Status::OK; + })) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v1::ListVolumesRequest* request, + csi::v1::ListVolumesResponse* response) { + csi::v1::Volume* volume = response->add_entries()->mutable_volume(); + volume->set_capacity_bytes(Bytes(1024).bytes()); + volume->set_volume_id("volume1"); + + listVolumesCalls.put({}); + return grpc::Status::OK; + })); + + EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>())) + .WillOnce(Invoke([&]( + grpc::ServerContext* context, + const csi::v1::GetCapacityRequest* request, + csi::v1::GetCapacityResponse* response) { + getCapacityCalls.put({}); + return grpc::Status::OK; + })) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v1::GetCapacityRequest* request, + csi::v1::GetCapacityResponse* response) { + response->set_available_capacity(Bytes(1024).bytes()); + + getCapacityCalls.put({}); + return grpc::Status::OK; + })); + } + + Future<Nothing> listVolumes1 = listVolumesCalls.get(); + Future<Nothing> listVolumes2 = listVolumesCalls.get(); + + Future<Nothing> getCapacity1 = getCapacityCalls.get(); + Future<Nothing> getCapacity2 = getCapacityCalls.get(); + + + // Since the local resource provider daemon gets subscribed after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. After that a single update + // will be send since they underlying provider resources got changed. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlave3 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave2 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave1 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration and prevent retry. + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlave1); + ASSERT_TRUE(updateSlave1->has_resource_providers()); + ASSERT_TRUE(updateSlave1->resource_providers().providers().empty()); + + // NOTE: We need to resume the clock so that the resource provider can + // periodically check if the CSI endpoint socket has been created by + // the plugin container, which runs in another Linux process. + Clock::resume(); + + AWAIT_READY(getCapacity1); + AWAIT_READY(listVolumes1); + + AWAIT_READY(updateSlave2); + ASSERT_TRUE(updateSlave2->has_resource_providers()); + ASSERT_FALSE(updateSlave2->resource_providers().providers().empty()); + + Clock::pause(); + + // Advance the clock so the SLRP polls for volume and storage pool updates. + Clock::settle(); + Clock::advance(reconciliationInterval); + + AWAIT_READY(listVolumes2); + AWAIT_READY(getCapacity2); + ASSERT_TRUE(updateSlave3.isPending()); + + // Advance the clock so the SLRP polls again. + Future<Nothing> listVolumes3 = listVolumesCalls.get(); + Future<Nothing> getCapacity3 = getCapacityCalls.get(); + + Clock::settle(); + Clock::advance(reconciliationInterval); + + AWAIT_READY(listVolumes3); + AWAIT_READY(getCapacity3); + AWAIT_READY(updateSlave3); + ASSERT_TRUE(updateSlave3->has_resource_providers()); + ASSERT_FALSE(updateSlave3->resource_providers().providers().empty()); + + // Resource changes are reported and the resource version changes. + ASSERT_NE( + updateSlave2->resource_providers().providers(0).resource_version_uuid(), + updateSlave3->resource_providers().providers(0).resource_version_uuid()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {