Repository: mesos Updated Branches: refs/heads/1.5.x d85cd885f -> f023091fb
Fixed resource provider driver disconnection handling. The expectation for disconnection of the resource provider driver is that disconnection handlers of the resource provider would be invoked and a new connection would be detected. This patch fixed the issue by transition the future returned by ConstantEndpointDetector into DISCARDED if a "discard" is initiated by the caller. This will properly trigger `detected` callback to be called. This patch is based on: https://reviews.apache.org/r/64806/ Review: https://reviews.apache.org/r/64856 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f023091f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f023091f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f023091f Branch: refs/heads/1.5.x Commit: f023091fb45e2eac766a79ee88482f4120f57264 Parents: d85cd88 Author: Benjamin Bannier <[email protected]> Authored: Fri Dec 22 13:24:53 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 27 13:43:12 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/detector.cpp | 26 +++++- src/resource_provider/http_connection.hpp | 1 + src/tests/resource_provider_manager_tests.cpp | 96 ++++++++++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f023091f/src/resource_provider/detector.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/detector.cpp b/src/resource_provider/detector.cpp index 59f2b9b..e878d54 100644 --- a/src/resource_provider/detector.cpp +++ b/src/resource_provider/detector.cpp @@ -16,9 +16,15 @@ #include "resource_provider/detector.hpp" +#include <memory> +#include <utility> + +#include <stout/lambda.hpp> + namespace http = process::http; using process::Future; +using process::Promise; namespace mesos { namespace internal { @@ -33,7 +39,25 @@ Future<Option<http::URL>> ConstantEndpointDetector::detect( if (previous.isNone() || stringify(previous.get()) != stringify(url)) { return url; } else { - return Future<Option<http::URL>>(); // A pending future. + // Use a promise here to properly handle discard semantics. + std::unique_ptr<Promise<Option<http::URL>>> promise( + new Promise<Option<http::URL>>()); + + Future<Option<http::URL>> future = promise->future(); + + // TODO(jieyu): There is a cyclic dependency here because `future` + // holds a reference to `promise` and `promise` holds a reference + // to the `future`. It won't get properly cleaned up if + // `future.discard()` is not called and `future` is not terminal. + // Currently, it's OK because the caller always do a + // `future.discard()` before removing the reference to `future`. + future.onDiscard(lambda::partial( + [](std::unique_ptr<Promise<Option<http::URL>>> promise) { + promise->discard(); + }, + std::move(promise))); + + return future; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/f023091f/src/resource_provider/http_connection.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp index 3d5088d..add5acc 100644 --- a/src/resource_provider/http_connection.hpp +++ b/src/resource_provider/http_connection.hpp @@ -322,6 +322,7 @@ protected: subscribed = None(); endpoint = None(); connectionId = None(); + detection.discard(); } void disconnected(const id::UUID& _connectionId, const std::string& failure) http://git-wip-us.apache.org/repos/asf/mesos/blob/f023091f/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index f58ab6b..096747e 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -90,7 +90,9 @@ using process::http::UnsupportedMediaType; using std::string; using std::vector; +using testing::DoAll; using testing::Eq; +using testing::Invoke; using testing::SaveArg; using testing::Values; using testing::WithParamInterface; @@ -1295,6 +1297,100 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect) } } + +// This test verifies that if a second resource provider subscribes +// with the ID of an already connected resource provider, the first +// instance gets disconnected and the second subscription is handled +// as a resubscription. +TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect) +{ + Clock::pause(); + + // Start master and agent. + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(agent); + + Clock::advance(slaveFlags.registration_backoff_factor); + Clock::settle(); + AWAIT_READY(updateSlaveMessage); + + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.rp.test"); + resourceProviderInfo.set_name("test"); + + Owned<v1::MockResourceProvider> resourceProvider1( + new v1::MockResourceProvider(resourceProviderInfo)); + + // Start and register a resource provider. + string scheme = "http"; + +#ifdef USE_SSL_SOCKET + if (process::network::openssl::flags().enabled) { + scheme = "https"; + } +#endif + + http::URL url( + scheme, + agent.get()->pid.address.ip, + agent.get()->pid.address.port, + agent.get()->pid.id + "/api/v1/resource_provider"); + + Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url)); + + Future<Event::Subscribed> subscribed1; + EXPECT_CALL(*resourceProvider1, subscribed(_)) + .WillOnce(FutureArg<0>(&subscribed1)); + + resourceProvider1->start( + endpointDetector, + ContentType::PROTOBUF, + v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(subscribed1); + + resourceProviderInfo.mutable_id()->CopyFrom(subscribed1->provider_id()); + + // Subscribing a second resource provider with the same ID will + // disconnect the first instance and handle the subscription by the + // second resource provider as a resubscription. + Owned<v1::MockResourceProvider> resourceProvider2( + new v1::MockResourceProvider(resourceProviderInfo)); + + // We terminate the first resource provider once we have confirmed + // that it got disconnected. This avoids it to in turn resubscribe + // racing with the other resource provider. + Future<Nothing> disconnected1; + EXPECT_CALL(*resourceProvider1, disconnected()) + .WillOnce(DoAll( + FutureSatisfy(&disconnected1), + Invoke([&resourceProvider1]() { resourceProvider1.reset(); }))) + .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`. + + Future<Event::Subscribed> subscribed2; + EXPECT_CALL(*resourceProvider2, subscribed(_)) + .WillOnce(FutureArg<0>(&subscribed2)); + + resourceProvider2->start( + endpointDetector, + ContentType::PROTOBUF, + v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(disconnected1); + AWAIT_READY(subscribed2); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
