Repository: mesos Updated Branches: refs/heads/master d2678e273 -> a9f834a91
Tests for subscribe/failover functionality for HTTP based framework. This implements the tests for HTTP framework subscribe, failover and upgrade from a PID based framework. The tests are parameterized on content type and hence test both JSON/Protobuf responses. Review: https://reviews.apache.org/r/37082 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bb801c5e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bb801c5e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bb801c5e Branch: refs/heads/master Commit: bb801c5e3c2a8a0ceb1038c99e5631747a8066a0 Parents: d2678e2 Author: Anand Mazumdar <[email protected]> Authored: Tue Aug 11 19:53:31 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Aug 11 19:53:31 2015 -0700 ---------------------------------------------------------------------- src/master/master.hpp | 2 +- src/tests/http_api_tests.cpp | 376 +++++++++++++++++++++++++++++++++++++- 2 files changed, 376 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bb801c5e/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index bb7c8e9..4e29470 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1274,7 +1274,7 @@ struct HttpConnection process::http::Pipe::Writer writer; ContentType contentType; - recordio::Encoder<v1::scheduler::Event> encoder; + ::recordio::Encoder<v1::scheduler::Event> encoder; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/bb801c5e/src/tests/http_api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp index 23214df..0c356e5 100644 --- a/src/tests/http_api_tests.cpp +++ b/src/tests/http_api_tests.cpp @@ -16,11 +16,22 @@ * limitations under the License. */ +#include <string> + +#include <mesos/scheduler.hpp> + #include <process/future.hpp> #include <process/gtest.hpp> #include <process/http.hpp> #include <process/pid.hpp> +#include <stout/json.hpp> +#include <stout/lambda.hpp> +#include <stout/recordio.hpp> + +#include "common/http.hpp" +#include "common/recordio.hpp" + #include "master/master.hpp" #include "tests/mesos.hpp" @@ -29,18 +40,87 @@ using mesos::internal::master::Master; +using mesos::internal::recordio::Reader; + +using mesos::scheduler::Call; +using mesos::scheduler::Event; + using process::Future; using process::PID; using process::http::BadRequest; +using process::http::OK; +using process::http::Pipe; using process::http::Response; +using recordio::Decoder; + +using std::string; + +using testing::WithParamInterface; + namespace mesos { namespace internal { namespace tests { -class HttpApiTest : public MesosTest {}; +class HttpApiTest + : public MesosTest, + public WithParamInterface<string> +{ +public: + // TODO(anand): Use the serialize/deserialize from common/http.hpp + // when they are available. + Try<Event> deserialize( + const std::string& contentType, + const std::string& body) + { + if (contentType == APPLICATION_PROTOBUF) { + Event event; + if (!event.ParseFromString(body)) { + return Error("Failed to parse body into Event protobuf"); + } + return event; + } + + Try<JSON::Value> value = JSON::parse(body); + Try<Event> parse = ::protobuf::parse<Event>(value.get()); + return parse; + } + + std::string serialize(const Call& call, const std::string& contentType) + { + if (contentType == APPLICATION_PROTOBUF) { + return call.SerializeAsString(); + } + + return stringify(JSON::Protobuf(call)); + } + + master::Flags masterFlags() + { + master::Flags flags = CreateMasterFlags(); + flags.authenticate_frameworks = false; + return flags; + } +}; + + +// The HttpApi tests are parameterized by the content type. +INSTANTIATE_TEST_CASE_P( + ContentType, + HttpApiTest, + ::testing::Values(APPLICATION_PROTOBUF, APPLICATION_JSON)); + + +// TODO(anand): Add tests for: +// - A subscribed scheduler closes it's reader and then tries to +// subscribe again before the framework failover timeout and should +// succeed. +// +// - A subscribed PID scheduler disconnects and then tries to +// subscribe again as a HTTP framework before the framework failover +// timeout and should succeed. // TODO(anand): Add additional tests for validation. @@ -62,6 +142,300 @@ TEST_F(HttpApiTest, NoContentType) AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); } + +// This test verifies if the scheduler is able to receive a Subscribed +// event on the stream in response to a Subscribe call request. +TEST_P(HttpApiTest, Subscribe) +{ + Try<PID<Master>> master = StartMaster(masterFlags()); + ASSERT_SOME(master); + + Call call; + call.set_type(Call::SUBSCRIBE); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); + EXPECT_NE("", event.get().get().subscribed().framework_id().value()); + + Shutdown(); +} + + +// This test verifies if the scheduler can subscribe on retrying, +// e.g. after a ZK blip. +TEST_P(HttpApiTest, SubscribedOnRetryWithForce) +{ + Try<PID<Master>> master = StartMaster(masterFlags()); + ASSERT_SOME(master); + + Call call; + call.set_type(Call::SUBSCRIBE); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + FrameworkID frameworkId; + + { + Future<Response> response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + frameworkId = event.get().get().subscribed().framework_id(); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); + EXPECT_NE("", event.get().get().subscribed().framework_id().value()); + } + + { + // Now subscribe again with force set. + subscribe->set_force(true); + + call.mutable_framework_id()->CopyFrom(frameworkId); + subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId); + + Future<Response> response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + serialize(call, contentType), + contentType); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + + // Check if we were successfully able to subscribe after the blip. + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the same framework id is set. + ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); + EXPECT_EQ(frameworkId, event.get().get().subscribed().framework_id()); + } + + Shutdown(); +} + + +// This test verifies if we are able to upgrade from a PID based +// framework to HTTP when force is set. +TEST_P(HttpApiTest, UpdatePidToHttpScheduler) +{ + Try<PID<Master>> master = StartMaster(masterFlags()); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Start the scheduler without credentials. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + // Check that driver is notified with an error when the http + // framework is connected. + Future<FrameworkErrorMessage> errorMessage = + FUTURE_PROTOBUF(FrameworkErrorMessage(), _, _); + + EXPECT_CALL(sched, error(_, _)); + + driver.start(); + + AWAIT_READY(frameworkId); + EXPECT_NE("", frameworkId.get().value()); + + // Now try to subscribe as an HTTP framework. + Call call; + call.set_type(Call::SUBSCRIBE); + call.mutable_framework_id()->CopyFrom(frameworkId.get()); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->mutable_id()-> + CopyFrom(frameworkId.get()); + + subscribe->set_force(true); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type()); + EXPECT_EQ(frameworkId.get(), event.get().get().subscribed().framework_id()); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This test verifies that updating a PID based framework to HTTP +// framework fails when force is not set and the PID based +// framework is already connected. +TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce) +{ + Try<PID<Master>> master = StartMaster(masterFlags()); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Start the scheduler without credentials. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + driver.start(); + + AWAIT_READY(frameworkId); + EXPECT_NE("", frameworkId.get().value()); + + // Now try to subscribe using a HTTP framework without setting the + // 'force' field. + Call call; + call.set_type(Call::SUBSCRIBE); + call.mutable_framework_id()->CopyFrom(frameworkId.get()); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->mutable_id()-> + CopyFrom(frameworkId.get()); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get()); + + Future<Result<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // We should be receiving an error event since the PID framework + // was already connected. + ASSERT_EQ(Event::ERROR, event.get().get().type()); + + driver.stop(); + driver.join(); + + Shutdown(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
