Added a call to update total resources and pending operations. Now a resource provider first `SUBSCRIBE` to the resource provider manager without resources to get its ID, then use the ID to prepare the checkpoints for recovery and persistent work directory, and then update its total resources and pending operations through `UPDATE_STATE`.
Review: https://reviews.apache.org/r/62903/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94907305 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94907305 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94907305 Branch: refs/heads/master Commit: 94907305d6caf0c2f3d2dab19a4068fd2df63354 Parents: d6ac32d Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Oct 18 16:37:41 2017 -0700 Committer: Jie Yu <[email protected]> Committed: Sun Oct 29 15:57:28 2017 +0100 ---------------------------------------------------------------------- .../resource_provider/resource_provider.proto | 24 ++-- .../resource_provider/resource_provider.proto | 19 ++- src/resource_provider/manager.cpp | 66 ++++++---- src/resource_provider/validation.cpp | 12 ++ src/tests/resource_provider_manager_tests.cpp | 132 ++++++++++++------- .../resource_provider_validation_tests.cpp | 23 ++++ 6 files changed, 184 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/include/mesos/resource_provider/resource_provider.proto ---------------------------------------------------------------------- diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto index ef67aa1..1b26f89 100644 --- a/include/mesos/resource_provider/resource_provider.proto +++ b/include/mesos/resource_provider/resource_provider.proto @@ -70,23 +70,12 @@ message Call { SUBSCRIBE = 1; // See 'Subscribe'. UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'. + UPDATE_STATE = 3; // See 'UpdateState'. } // Request to subscribe with the master. - // - // TODO(bbannier): Once we have implemented a call to update a - // resource provider, consider removing resources here and instead - // moving to a protocol where a resource provider first subscribes - // and then updates its resources. message Subscribe { required ResourceProviderInfo resource_provider_info = 1; - - // This includes pending operations and those that have - // unacknowledged statuses. - repeated OfferOperation operations = 3; - - // The total resources provided by this resource provider. - repeated Resource resources = 2; } // Notify the master about the status of an offer operation. @@ -101,6 +90,16 @@ message Call { required bytes operation_uuid = 4; } + // Notify the master about the total resources and pending operations. + message UpdateState { + // This includes pending operations and those that have + // unacknowledged statuses. + repeated OfferOperation operations = 1; + + // The total resources provided by this resource provider. + repeated Resource resources = 2; + } + // Identifies who generated this call. // The 'ResourceProviderManager' assigns a resource provider ID when // a new resource provider subscribes for the first time. Once @@ -114,4 +113,5 @@ message Call { optional Type type = 2; optional Subscribe subscribe = 3; optional UpdateOfferOperationStatus update_offer_operation_status = 4; + optional UpdateState update_state = 5; } http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/include/mesos/v1/resource_provider/resource_provider.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto index 10b77c7..3afdbb9 100644 --- a/include/mesos/v1/resource_provider/resource_provider.proto +++ b/include/mesos/v1/resource_provider/resource_provider.proto @@ -70,18 +70,12 @@ message Call { SUBSCRIBE = 1; // See 'Subscribe'. UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'. + UPDATE_STATE = 3; // See 'UpdateState'. } // Request to subscribe with the master. message Subscribe { required ResourceProviderInfo resource_provider_info = 1; - - // This includes pending operations and those that have - // unacknowledged statuses. - repeated OfferOperation operations = 3; - - // The total resources provided by this resource provider. - repeated Resource resources = 2; } // Notify the master about the status of an operation. @@ -96,6 +90,16 @@ message Call { required bytes operation_uuid = 4; } + // Notify the master about the total resources and pending operations. + message UpdateState { + // This includes pending operations and those that have + // unacknowledged statuses. + repeated OfferOperation operations = 1; + + // The total resources provided by this resource provider. + repeated Resource resources = 2; + } + // Identifies who generated this call. // The 'ResourceProviderManager' assigns a resource provider ID when // a new resource provider subscribes for the first time. Once @@ -109,4 +113,5 @@ message Call { optional Type type = 2; optional Subscribe subscribe = 3; optional UpdateOfferOperationStatus update_offer_operation_status = 4; + optional UpdateState update_state = 5; } http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 4713a2a..11f8901 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -120,11 +120,9 @@ struct ResourceProvider { ResourceProvider( const ResourceProviderInfo& _info, - const HttpConnection& _http, - const Resources& _resources) + const HttpConnection& _http) : info(_info), - http(_http), - resources(_resources) {} + http(_http) {} ResourceProviderInfo info; HttpConnection http; @@ -155,6 +153,10 @@ private: ResourceProvider* resourceProvider, const Call::UpdateOfferOperationStatus& update); + void updateState( + ResourceProvider* resourceProvider, + const Call::UpdateState& update); + ResourceProviderID newResourceProviderId(); }; @@ -289,6 +291,11 @@ Future<http::Response> ResourceProviderManagerProcess::api( return Accepted(); } + + case Call::UPDATE_STATE: { + updateState(&resourceProvider, call.update_state()); + return Accepted(); + } } UNREACHABLE(); @@ -301,16 +308,15 @@ void ResourceProviderManagerProcess::subscribe( { ResourceProviderInfo resourceProviderInfo = subscribe.resource_provider_info(); - resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId()); - // Inject the `ResourceProviderID` for all subscribed resources. - Resources resources; - foreach (Resource resource, subscribe.resources()) { - resource.mutable_provider_id()->CopyFrom(resourceProviderInfo.id()); - resources += resource; + // TODO(chhsiao): Reject the subscription if it contains an unknown ID + // or there is already a subscribed instance with the same ID, and add + // tests for re-subscriptions. + if (!resourceProviderInfo.has_id()) { + resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId()); } - ResourceProvider resourceProvider(resourceProviderInfo, http, resources); + ResourceProvider resourceProvider(resourceProviderInfo, http); Event event; event.set_type(Event::SUBSCRIBED); @@ -324,17 +330,6 @@ void ResourceProviderManagerProcess::subscribe( } resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider)); - - ResourceProviderMessage message; - message.type = ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES; - - ResourceProviderMessage::UpdateTotalResources updateTotalResources; - updateTotalResources.id = resourceProviderInfo.id(); - updateTotalResources.total = resources; - - message.updateTotalResources = std::move(updateTotalResources); - - messages.put(std::move(message)); } @@ -346,6 +341,33 @@ void ResourceProviderManagerProcess::updateOfferOperationStatus( } +void ResourceProviderManagerProcess::updateState( + ResourceProvider* resourceProvider, + const Call::UpdateState& update) +{ + Resources resources; + + foreach (const Resource& resource, update.resources()) { + CHECK_EQ(resource.provider_id(), resourceProvider->info.id()); + resources += resource; + } + + resourceProvider->resources = std::move(resources); + + // TODO(chhsiao): Report pending operations. + + ResourceProviderMessage::UpdateTotalResources updateTotalResources; + updateTotalResources.id = resourceProvider->info.id(); + updateTotalResources.total = resourceProvider->resources; + + ResourceProviderMessage message; + message.type = ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES; + message.updateTotalResources = std::move(updateTotalResources); + + messages.put(std::move(message)); +} + + ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId() { ResourceProviderID resourceProviderId; http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/resource_provider/validation.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/validation.cpp b/src/resource_provider/validation.cpp index d8a9fb0..ddc57c3 100644 --- a/src/resource_provider/validation.cpp +++ b/src/resource_provider/validation.cpp @@ -61,6 +61,18 @@ Option<Error> validate(const Call& call) return None(); } + + case Call::UPDATE_STATE: { + if (!call.has_resource_provider_id()) { + return Error("Expecting 'resource_provider_id' to be present"); + } + + if (!call.has_update_state()) { + return Error("Expecting 'update_state' to be present"); + } + + return None(); + } } UNREACHABLE(); http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index ca49e1f..f0fd5e8 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -77,6 +77,7 @@ using process::Future; using process::Owned; using process::PID; +using process::http::Accepted; using process::http::BadRequest; using process::http::OK; using process::http::UnsupportedMediaType; @@ -210,78 +211,107 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType) } -TEST_P(ResourceProviderManagerHttpApiTest, Subscribe) +TEST_P(ResourceProviderManagerHttpApiTest, UpdateState) { - Call call; - call.set_type(Call::SUBSCRIBE); + const ContentType contentType = GetParam(); - Call::Subscribe* subscribe = call.mutable_subscribe(); + ResourceProviderManager manager; - const v1::Resources resources = v1::Resources::parse("disk:4").get(); - subscribe->mutable_resources()->CopyFrom(resources); + Option<UUID> streamId; + Option<mesos::v1::ResourceProviderID> resourceProviderId; - mesos::v1::ResourceProviderInfo* info = - subscribe->mutable_resource_provider_info(); + // First, subscribe to the manager to get the ID. + { + Call call; + call.set_type(Call::SUBSCRIBE); - info->set_type("org.apache.mesos.rp.test"); - info->set_name("test"); + Call::Subscribe* subscribe = call.mutable_subscribe(); - const ContentType contentType = GetParam(); + mesos::v1::ResourceProviderInfo* info = + subscribe->mutable_resource_provider_info(); - http::Request request; - request.method = "POST"; - request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); - request.headers["Accept"] = stringify(contentType); - request.headers["Content-Type"] = stringify(contentType); - request.body = serialize(contentType, call); + info->set_type("org.apache.mesos.rp.test"); + info->set_name("test"); - ResourceProviderManager manager; + http::Request request; + request.method = "POST"; + request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + request.headers["Accept"] = stringify(contentType); + request.headers["Content-Type"] = stringify(contentType); + request.body = serialize(contentType, call); - Future<http::Response> response = manager.api(request, None()); + Future<http::Response> response = manager.api(request, None()); - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - ASSERT_EQ(http::Response::PIPE, response->type); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + ASSERT_EQ(http::Response::PIPE, response->type); - Option<http::Pipe::Reader> reader = response->reader; - ASSERT_SOME(reader); + ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id")); + Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id")); - recordio::Reader<Event> responseDecoder( - ::recordio::Decoder<Event>( - lambda::bind(deserialize<Event>, contentType, lambda::_1)), - reader.get()); + CHECK_SOME(uuid); + streamId = uuid.get(); - Future<Result<Event>> event = responseDecoder.read(); - AWAIT_READY(event); - ASSERT_SOME(event.get()); + Option<http::Pipe::Reader> reader = response->reader; + ASSERT_SOME(reader); - // Check event type is subscribed and the resource provider id is set. - ASSERT_EQ(Event::SUBSCRIBED, event->get().type()); + recordio::Reader<Event> responseDecoder( + ::recordio::Decoder<Event>( + lambda::bind(deserialize<Event>, contentType, lambda::_1)), + reader.get()); - mesos::v1::ResourceProviderID resourceProviderId = - event->get().subscribed().provider_id(); + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); - EXPECT_FALSE(resourceProviderId.value().empty()); + // Check event type is subscribed and the resource provider id is set. + ASSERT_EQ(Event::SUBSCRIBED, event->get().type()); - // The manager will send out a message informing its subscriber - // about the newly added resources. - Future<ResourceProviderMessage> message = manager.messages().get(); + resourceProviderId = event->get().subscribed().provider_id(); - AWAIT_READY(message); + EXPECT_FALSE(resourceProviderId->value().empty()); + } - EXPECT_EQ( - ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES, - message->type); + // Then, update the total resources to the manager. + { + std::vector<v1::Resource> resources = + v1::Resources::fromString("disk:4").get(); + foreach (v1::Resource& resource, resources) { + resource.mutable_provider_id()->CopyFrom(resourceProviderId.get()); + } - // We expect `ResourceProviderID`s to be set for all subscribed resources. - // Inject them into the test expectation. - Resources expectedResources; - foreach (v1::Resource resource, resources) { - resource.mutable_provider_id()->CopyFrom(resourceProviderId); - expectedResources += devolve(resource); - } + Call call; + call.set_type(Call::UPDATE_STATE); + call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get()); + + Call::UpdateState* updateState = call.mutable_update_state(); + + updateState->mutable_resources()->CopyFrom(v1::Resources(resources)); + + http::Request request; + request.method = "POST"; + request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + request.headers["Accept"] = stringify(contentType); + request.headers["Content-Type"] = stringify(contentType); + request.headers["Mesos-Stream-Id"] = stringify(streamId.get()); + request.body = serialize(contentType, call); - EXPECT_EQ(devolve(resourceProviderId), message->updateTotalResources->id); - EXPECT_EQ(expectedResources, message->updateTotalResources->total); + Future<http::Response> response = manager.api(request, None()); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response); + + // The manager will send out a message informing its subscriber + // about the newly added resources. + Future<ResourceProviderMessage> message = manager.messages().get(); + + AWAIT_READY(message); + + EXPECT_EQ( + ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES, + message->type); + EXPECT_EQ( + devolve(resourceProviderId.get()), message->updateTotalResources->id); + EXPECT_EQ(devolve(resources), message->updateTotalResources->total); + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/tests/resource_provider_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp index 862e844..e6960b1 100644 --- a/src/tests/resource_provider_validation_tests.cpp +++ b/src/tests/resource_provider_validation_tests.cpp @@ -84,6 +84,29 @@ TEST(ResourceProviderCallValidationTest, UpdateOfferOperationStatus) EXPECT_NONE(error); } + +TEST(ResourceProviderCallValidationTest, UpdateState) +{ + Call call; + call.set_type(Call::UPDATE_STATE); + + // Expecting a resource provider ID and `Call::UpdateState`. + Option<Error> error = call::validate(call); + EXPECT_SOME(error); + + ResourceProviderID* id = call.mutable_resource_provider_id(); + id->set_value(UUID::random().toString()); + + // Still expecting `Call::UpdateState`. + error = call::validate(call); + EXPECT_SOME(error); + + call.mutable_update_state(); + + error = call::validate(call); + EXPECT_NONE(error); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
