Repository: mesos Updated Branches: refs/heads/master 558042c0f -> 077b29a21
Tests for subscribe/failover functionality for HTTP based framework. This implements the tests for HTTP framework subscribe/failover/upgrade from a PID based framework. The test are parameterized on content type and hence test both JSON/Protobuf responses. Review: https://reviews.apache.org/r/37082 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/67a85c6e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/67a85c6e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/67a85c6e Branch: refs/heads/master Commit: 67a85c6e885136da86bbd9bc267910fb0ec23820 Parents: 558042c Author: Anand Mazumdar <[email protected]> Authored: Fri Aug 7 10:38:31 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Aug 7 10:38:31 2015 -0700 ---------------------------------------------------------------------- src/tests/http_api_tests.cpp | 389 +++++++++++++++++++++++++++++++++++++- 1 file changed, 388 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/67a85c6e/src/tests/http_api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp index 586d112..57d7e74 100644 --- a/src/tests/http_api_tests.cpp +++ b/src/tests/http_api_tests.cpp @@ -16,11 +16,22 @@ * limitations under the License. */ +#include <string> + +#include <mesos/scheduler.hpp> + #include <process/future.hpp> #include <process/gtest.hpp> #include <process/http.hpp> #include <process/pid.hpp> +#include <stout/json.hpp> +#include <stout/lambda.hpp> +#include <stout/recordio.hpp> + +#include "common/http.hpp" +#include "common/recordio_response.hpp" + #include "master/master.hpp" #include "tests/mesos.hpp" @@ -29,18 +40,76 @@ using mesos::internal::master::Master; +using mesos::internal::RecordIOResponseReader; + +using mesos::scheduler::Call; +using mesos::scheduler::Event; + using process::Future; using process::PID; using process::http::BadRequest; +using process::http::OK; +using process::http::Pipe; using process::http::Response; +using std::string; + +using testing::WithParamInterface; + namespace mesos { namespace internal { namespace tests { -class HttpApiTest : public MesosTest {}; +class HttpApiTest + : public MesosTest, + public WithParamInterface<string> +{ +public: + Try<Event> deserialize( + const std::string& contentType, + const std::string& body) + { + if (contentType == APPLICATION_PROTOBUF) { + Event event; + if (!event.ParseFromString(body)) { + return Error("Failed to parse body into Event protobuf"); + } + return event; + } + + Try<JSON::Value> value = JSON::parse(body); + Try<Event> parse = ::protobuf::parse<Event>(value.get()); + return parse; + } + + std::string serialize(const Call& call, const std::string& contentType) + { + if (contentType == APPLICATION_PROTOBUF) { + return call.SerializeAsString(); + } + + return stringify(JSON::Protobuf(call)); + } +}; + + +// The HttpApi tests are parameterized by the content type. +INSTANTIATE_TEST_CASE_P( + ContentType, + HttpApiTest, + ::testing::Values(APPLICATION_PROTOBUF, APPLICATION_JSON)); + + +// TODO(anand): Add tests for: +// - A subscribed scheduler closes it's reader and then tries to +// subscribe again before the framework failover timeout and should +// succeed. +// +// - A subscribed PID scheduler disconnects and then tries to +// subscribe again as a HTTP framework before the framework failover +// timeout and should succeed. // TODO(anand): Add additional tests for validation. @@ -62,6 +131,324 @@ TEST_F(HttpApiTest, NoContentType) AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); } + +// This test verifies if the scheduler is able to receive a Subscribed +// event on the stream in response to a Subscribe call request. +TEST_P(HttpApiTest, Subscribe) +{ + // TODO(anand): Enable authentication later. + master::Flags flags = CreateMasterFlags(); + flags.authenticate_frameworks = false; + + Try<PID<Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + Call call; + call.set_type(Call::SUBSCRIBE); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "call", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + recordio::Decoder<Event> decoder(deserializer); + RecordIOResponseReader<Event> + responseDecoder(decoder, reader.get()); + + Future<Option<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED); + EXPECT_NE(event.get().get().subscribed().framework_id().value(), ""); + + Shutdown(); +} + + +// This test verifies if the scheduler can subscribe on retrying, +// e.g. after a ZK blip. +TEST_P(HttpApiTest, SubscribedOnRetryWithForce) +{ + // TODO(anand): Enable authentication later. + master::Flags flags = CreateMasterFlags(); + flags.authenticate_frameworks = false; + + Try<PID<Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + Call call; + call.set_type(Call::SUBSCRIBE); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "call", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + recordio::Decoder<Event> decoder(deserializer); + RecordIOResponseReader<Event> + responseDecoder(decoder, reader.get()); + + Future<Option<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED); + EXPECT_NE(event.get().get().subscribed().framework_id().value(), ""); + + // Now subscribe again with force set. + subscribe->set_force(true); + + call.mutable_framework_id()-> + CopyFrom(event.get().get().subscribed().framework_id()); + + subscribe->mutable_framework_info()->mutable_id()-> + CopyFrom(event.get().get().subscribed().framework_id()); + + auto response2 = process::http::streaming::post( + master.get(), + "call", + headers, + serialize(call, contentType), + contentType); + + Option<Pipe::Reader> reader2 = response2.get().reader; + ASSERT_SOME(reader2); + + recordio::Decoder<Event> decoder2(deserializer); + RecordIOResponseReader<Event> + responseDecoder2(decoder2, reader2.get()); + + // Check if we were successfully able to subscribe after the blip. + Future<Option<Event>> event2 = responseDecoder2.read(); + AWAIT_READY(event2); + ASSERT_FALSE(event2.isFailed()); + ASSERT_SOME(event2.get()); + + // Check event type is subscribed and the same framework id is set. + ASSERT_EQ(event2.get().get().type(), Event::SUBSCRIBED); + EXPECT_EQ(event2.get().get().subscribed().framework_id(), + event.get().get().subscribed().framework_id()); + + Shutdown(); +} + + +// This test verifies if we are able to upgrade from a pid +// based framework to http when force is set. +TEST_P(HttpApiTest, UpdatePidToHttpScheduler) +{ + // TODO(anand): Enable authentication later. + master::Flags flags = CreateMasterFlags(); + flags.authenticate_frameworks = false; + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + Try<PID<Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + // Start the scheduler without credentials. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + // Check that driver is notified with an error when the http + // framework is connected. + Future<FrameworkErrorMessage> errorMessage = + FUTURE_PROTOBUF(FrameworkErrorMessage(), _, _); + + EXPECT_CALL(sched, error(_, _)); + + driver.start(); + + AWAIT_READY(frameworkId); + EXPECT_NE("", frameworkId.get().value()); + + // Now try to subscribe as a HTTP framework. + Call call; + call.set_type(Call::SUBSCRIBE); + call.mutable_framework_id()->CopyFrom(frameworkId.get()); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->mutable_id()-> + CopyFrom(frameworkId.get()); + + subscribe->set_force(true); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "call", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + recordio::Decoder<Event> decoder(deserializer); + RecordIOResponseReader<Event> + responseDecoder(decoder, reader.get()); + + Future<Option<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // Check event type is subscribed and the framework id is set. + ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED); + EXPECT_EQ(frameworkId.get(), + event.get().get().subscribed().framework_id()); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// This test verifies that updating a PID based framework to HTTP +// framework fails when force is not set and the PID based +// framework is already connected. +TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce) +{ + // TODO(anand): Enable authentication later. + master::Flags flags = CreateMasterFlags(); + flags.authenticate_frameworks = false; + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + Try<PID<Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + // Start the scheduler without credentials. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + driver.start(); + + AWAIT_READY(frameworkId); + EXPECT_NE("", frameworkId.get().value()); + + // Now try to subscribe using a HTTP framework without setting + // 'force' field. + Call call; + call.set_type(Call::SUBSCRIBE); + call.mutable_framework_id()->CopyFrom(frameworkId.get()); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->mutable_id()-> + CopyFrom(frameworkId.get()); + + // Retrieve the parameter passed as content type to this test. + const std::string contentType = GetParam(); + hashmap<string, string> headers; + headers["Accept"] = contentType; + + Future<Response> response = process::http::streaming::post( + master.get(), + "call", + headers, + serialize(call, contentType), + contentType); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(Response::PIPE, response.get().type); + + Option<Pipe::Reader> reader = response.get().reader; + ASSERT_SOME(reader); + + auto deserializer = + lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1); + + recordio::Decoder<Event> decoder(deserializer); + RecordIOResponseReader<Event> + responseDecoder(decoder, reader.get()); + + Future<Option<Event>> event = responseDecoder.read(); + AWAIT_READY(event); + ASSERT_SOME(event.get()); + + // We should be receiving an error event since the PID framework + // was already connected. + ASSERT_EQ(event.get().get().type(), Event::ERROR); + + driver.stop(); + driver.join(); + + Shutdown(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
