Repository: mesos
Updated Branches:
  refs/heads/master 558042c0f -> 077b29a21


Tests for subscribe/failover functionality for HTTP based framework.

This implements the tests for HTTP framework subscribe/failover/upgrade
from a PID based framework. The test are parameterized on content type
and hence test both JSON/Protobuf responses.

Review: https://reviews.apache.org/r/37082


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/67a85c6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/67a85c6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/67a85c6e

Branch: refs/heads/master
Commit: 67a85c6e885136da86bbd9bc267910fb0ec23820
Parents: 558042c
Author: Anand Mazumdar <[email protected]>
Authored: Fri Aug 7 10:38:31 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Fri Aug 7 10:38:31 2015 -0700

----------------------------------------------------------------------
 src/tests/http_api_tests.cpp | 389 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 388 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/67a85c6e/src/tests/http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp
index 586d112..57d7e74 100644
--- a/src/tests/http_api_tests.cpp
+++ b/src/tests/http_api_tests.cpp
@@ -16,11 +16,22 @@
  * limitations under the License.
  */
 
+#include <string>
+
+#include <mesos/scheduler.hpp>
+
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
 #include <process/pid.hpp>
 
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/recordio.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio_response.hpp"
+
 #include "master/master.hpp"
 
 #include "tests/mesos.hpp"
@@ -29,18 +40,76 @@
 
 using mesos::internal::master::Master;
 
+using mesos::internal::RecordIOResponseReader;
+
+using mesos::scheduler::Call;
+using mesos::scheduler::Event;
+
 using process::Future;
 using process::PID;
 
 using process::http::BadRequest;
+using process::http::OK;
+using process::http::Pipe;
 using process::http::Response;
 
+using std::string;
+
+using testing::WithParamInterface;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
 
-class HttpApiTest : public MesosTest {};
+class HttpApiTest
+  : public MesosTest,
+    public WithParamInterface<string>
+{
+public:
+  Try<Event> deserialize(
+      const std::string& contentType,
+      const std::string& body)
+  {
+    if (contentType == APPLICATION_PROTOBUF) {
+      Event event;
+      if (!event.ParseFromString(body)) {
+        return Error("Failed to parse body into Event protobuf");
+      }
+      return event;
+    }
+
+    Try<JSON::Value> value = JSON::parse(body);
+    Try<Event> parse = ::protobuf::parse<Event>(value.get());
+    return parse;
+  }
+
+  std::string serialize(const Call& call, const std::string& contentType)
+  {
+    if (contentType == APPLICATION_PROTOBUF) {
+      return call.SerializeAsString();
+    }
+
+    return stringify(JSON::Protobuf(call));
+  }
+};
+
+
+// The HttpApi tests are parameterized by the content type.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    HttpApiTest,
+    ::testing::Values(APPLICATION_PROTOBUF, APPLICATION_JSON));
+
+
+// TODO(anand): Add tests for:
+// - A subscribed scheduler closes it's reader and then tries to
+//  subscribe again before the framework failover timeout and should
+//  succeed.
+//
+// - A subscribed PID scheduler disconnects and then tries to
+//  subscribe again as a HTTP framework before the framework failover
+//  timeout and should succeed.
 
 
 // TODO(anand): Add additional tests for validation.
@@ -62,6 +131,324 @@ TEST_F(HttpApiTest, NoContentType)
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
 }
 
+
+// This test verifies if the scheduler is able to receive a Subscribed
+// event on the stream in response to a Subscribe call request.
+TEST_P(HttpApiTest, Subscribe)
+{
+  // TODO(anand): Enable authentication later.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "call",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  recordio::Decoder<Event> decoder(deserializer);
+  RecordIOResponseReader<Event>
+    responseDecoder(decoder, reader.get());
+
+  Future<Option<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the framework id is set.
+  ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED);
+  EXPECT_NE(event.get().get().subscribed().framework_id().value(), "");
+
+  Shutdown();
+}
+
+
+// This test verifies if the scheduler can subscribe on retrying,
+// e.g. after a ZK blip.
+TEST_P(HttpApiTest, SubscribedOnRetryWithForce)
+{
+  // TODO(anand): Enable authentication later.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "call",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  recordio::Decoder<Event> decoder(deserializer);
+  RecordIOResponseReader<Event>
+    responseDecoder(decoder, reader.get());
+
+  Future<Option<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the framework id is set.
+  ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED);
+  EXPECT_NE(event.get().get().subscribed().framework_id().value(), "");
+
+  // Now subscribe again with force set.
+  subscribe->set_force(true);
+
+  call.mutable_framework_id()->
+    CopyFrom(event.get().get().subscribed().framework_id());
+
+  subscribe->mutable_framework_info()->mutable_id()->
+    CopyFrom(event.get().get().subscribed().framework_id());
+
+  auto response2 = process::http::streaming::post(
+      master.get(),
+      "call",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  Option<Pipe::Reader> reader2 = response2.get().reader;
+  ASSERT_SOME(reader2);
+
+  recordio::Decoder<Event> decoder2(deserializer);
+  RecordIOResponseReader<Event>
+    responseDecoder2(decoder2, reader2.get());
+
+  // Check if we were successfully able to subscribe after the blip.
+  Future<Option<Event>> event2 = responseDecoder2.read();
+  AWAIT_READY(event2);
+  ASSERT_FALSE(event2.isFailed());
+  ASSERT_SOME(event2.get());
+
+  // Check event type is subscribed and the same framework id is set.
+  ASSERT_EQ(event2.get().get().type(), Event::SUBSCRIBED);
+  EXPECT_EQ(event2.get().get().subscribed().framework_id(),
+            event.get().get().subscribed().framework_id());
+
+  Shutdown();
+}
+
+
+// This test verifies if we are able to upgrade from a pid
+// based framework to http when force is set.
+TEST_P(HttpApiTest, UpdatePidToHttpScheduler)
+{
+  // TODO(anand): Enable authentication later.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Start the scheduler without credentials.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  // Check that driver is notified with an error when the http
+  // framework is connected.
+  Future<FrameworkErrorMessage> errorMessage =
+    FUTURE_PROTOBUF(FrameworkErrorMessage(), _, _);
+
+  EXPECT_CALL(sched, error(_, _));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  EXPECT_NE("", frameworkId.get().value());
+
+  // Now try to subscribe as a HTTP framework.
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+  call.mutable_framework_id()->CopyFrom(frameworkId.get());
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+  subscribe->mutable_framework_info()->mutable_id()->
+    CopyFrom(frameworkId.get());
+
+  subscribe->set_force(true);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "call",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  recordio::Decoder<Event> decoder(deserializer);
+  RecordIOResponseReader<Event>
+    responseDecoder(decoder, reader.get());
+
+  Future<Option<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the framework id is set.
+  ASSERT_EQ(event.get().get().type(), Event::SUBSCRIBED);
+  EXPECT_EQ(frameworkId.get(),
+            event.get().get().subscribed().framework_id());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that updating a PID based framework to HTTP
+// framework fails when force is not set and the PID based
+// framework is already connected.
+TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce)
+{
+  // TODO(anand): Enable authentication later.
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Start the scheduler without credentials.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  EXPECT_NE("", frameworkId.get().value());
+
+  // Now try to subscribe using a HTTP framework without setting
+  // 'force' field.
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+  call.mutable_framework_id()->CopyFrom(frameworkId.get());
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+  subscribe->mutable_framework_info()->mutable_id()->
+    CopyFrom(frameworkId.get());
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "call",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  recordio::Decoder<Event> decoder(deserializer);
+  RecordIOResponseReader<Event>
+    responseDecoder(decoder, reader.get());
+
+  Future<Option<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // We should be receiving an error event since the PID framework
+  // was already connected.
+  ASSERT_EQ(event.get().get().type(), Event::ERROR);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to