Repository: mesos
Updated Branches:
  refs/heads/master d2678e273 -> a9f834a91


Tests for subscribe/failover functionality for HTTP based framework.

This implements the tests for HTTP framework subscribe, failover and
upgrade from a PID based framework. The tests are parameterized on
content type and hence test both JSON/Protobuf responses.

Review: https://reviews.apache.org/r/37082


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bb801c5e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bb801c5e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bb801c5e

Branch: refs/heads/master
Commit: bb801c5e3c2a8a0ceb1038c99e5631747a8066a0
Parents: d2678e2
Author: Anand Mazumdar <[email protected]>
Authored: Tue Aug 11 19:53:31 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Tue Aug 11 19:53:31 2015 -0700

----------------------------------------------------------------------
 src/master/master.hpp        |   2 +-
 src/tests/http_api_tests.cpp | 376 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 376 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bb801c5e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index bb7c8e9..4e29470 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1274,7 +1274,7 @@ struct HttpConnection
 
   process::http::Pipe::Writer writer;
   ContentType contentType;
-  recordio::Encoder<v1::scheduler::Event> encoder;
+  ::recordio::Encoder<v1::scheduler::Event> encoder;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb801c5e/src/tests/http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp
index 23214df..0c356e5 100644
--- a/src/tests/http_api_tests.cpp
+++ b/src/tests/http_api_tests.cpp
@@ -16,11 +16,22 @@
  * limitations under the License.
  */
 
+#include <string>
+
+#include <mesos/scheduler.hpp>
+
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
 #include <process/pid.hpp>
 
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/recordio.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
 #include "master/master.hpp"
 
 #include "tests/mesos.hpp"
@@ -29,18 +40,87 @@
 
 using mesos::internal::master::Master;
 
+using mesos::internal::recordio::Reader;
+
+using mesos::scheduler::Call;
+using mesos::scheduler::Event;
+
 using process::Future;
 using process::PID;
 
 using process::http::BadRequest;
+using process::http::OK;
+using process::http::Pipe;
 using process::http::Response;
 
+using recordio::Decoder;
+
+using std::string;
+
+using testing::WithParamInterface;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
 
-class HttpApiTest : public MesosTest {};
+class HttpApiTest
+  : public MesosTest,
+    public WithParamInterface<string>
+{
+public:
+  // TODO(anand): Use the serialize/deserialize from common/http.hpp
+  // when they are available.
+  Try<Event> deserialize(
+      const std::string& contentType,
+      const std::string& body)
+  {
+    if (contentType == APPLICATION_PROTOBUF) {
+      Event event;
+      if (!event.ParseFromString(body)) {
+        return Error("Failed to parse body into Event protobuf");
+      }
+      return event;
+    }
+
+    Try<JSON::Value> value = JSON::parse(body);
+    Try<Event> parse = ::protobuf::parse<Event>(value.get());
+    return parse;
+  }
+
+  std::string serialize(const Call& call, const std::string& contentType)
+  {
+    if (contentType == APPLICATION_PROTOBUF) {
+      return call.SerializeAsString();
+    }
+
+    return stringify(JSON::Protobuf(call));
+  }
+
+  master::Flags masterFlags()
+  {
+    master::Flags flags = CreateMasterFlags();
+    flags.authenticate_frameworks = false;
+    return flags;
+  }
+};
+
+
+// The HttpApi tests are parameterized by the content type.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    HttpApiTest,
+    ::testing::Values(APPLICATION_PROTOBUF, APPLICATION_JSON));
+
+
+// TODO(anand): Add tests for:
+// - A subscribed scheduler closes it's reader and then tries to
+//  subscribe again before the framework failover timeout and should
+//  succeed.
+//
+// - A subscribed PID scheduler disconnects and then tries to
+//  subscribe again as a HTTP framework before the framework failover
+//  timeout and should succeed.
 
 
 // TODO(anand): Add additional tests for validation.
@@ -62,6 +142,300 @@ TEST_F(HttpApiTest, NoContentType)
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
 }
 
+
+// This test verifies if the scheduler is able to receive a Subscribed
+// event on the stream in response to a Subscribe call request.
+TEST_P(HttpApiTest, Subscribe)
+{
+  Try<PID<Master>> master = StartMaster(masterFlags());
+  ASSERT_SOME(master);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "api/v1/scheduler",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the framework id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+  EXPECT_NE("", event.get().get().subscribed().framework_id().value());
+
+  Shutdown();
+}
+
+
+// This test verifies if the scheduler can subscribe on retrying,
+// e.g. after a ZK blip.
+TEST_P(HttpApiTest, SubscribedOnRetryWithForce)
+{
+  Try<PID<Master>> master = StartMaster(masterFlags());
+  ASSERT_SOME(master);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  FrameworkID frameworkId;
+
+  {
+    Future<Response> response = process::http::streaming::post(
+        master.get(),
+        "api/v1/scheduler",
+        headers,
+        serialize(call, contentType),
+        contentType);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(Response::PIPE, response.get().type);
+
+    Option<Pipe::Reader> reader = response.get().reader;
+    ASSERT_SOME(reader);
+
+    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    frameworkId = event.get().get().subscribed().framework_id();
+
+    // Check event type is subscribed and the framework id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+    EXPECT_NE("", event.get().get().subscribed().framework_id().value());
+  }
+
+  {
+    // Now subscribe again with force set.
+    subscribe->set_force(true);
+
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    Future<Response> response = process::http::streaming::post(
+        master.get(),
+        "api/v1/scheduler",
+        headers,
+        serialize(call, contentType),
+        contentType);
+
+    Option<Pipe::Reader> reader = response.get().reader;
+    ASSERT_SOME(reader);
+
+    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+
+    // Check if we were successfully able to subscribe after the blip.
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    // Check event type is subscribed and the same framework id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+    EXPECT_EQ(frameworkId, event.get().get().subscribed().framework_id());
+  }
+
+  Shutdown();
+}
+
+
+// This test verifies if we are able to upgrade from a PID based
+// framework to HTTP when force is set.
+TEST_P(HttpApiTest, UpdatePidToHttpScheduler)
+{
+  Try<PID<Master>> master = StartMaster(masterFlags());
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Start the scheduler without credentials.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  // Check that driver is notified with an error when the http
+  // framework is connected.
+  Future<FrameworkErrorMessage> errorMessage =
+    FUTURE_PROTOBUF(FrameworkErrorMessage(), _, _);
+
+  EXPECT_CALL(sched, error(_, _));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  EXPECT_NE("", frameworkId.get().value());
+
+  // Now try to subscribe as an HTTP framework.
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+  call.mutable_framework_id()->CopyFrom(frameworkId.get());
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+  subscribe->mutable_framework_info()->mutable_id()->
+    CopyFrom(frameworkId.get());
+
+  subscribe->set_force(true);
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "api/v1/scheduler",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the framework id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+  EXPECT_EQ(frameworkId.get(), event.get().get().subscribed().framework_id());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that updating a PID based framework to HTTP
+// framework fails when force is not set and the PID based
+// framework is already connected.
+TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce)
+{
+  Try<PID<Master>> master = StartMaster(masterFlags());
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Start the scheduler without credentials.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  EXPECT_NE("", frameworkId.get().value());
+
+  // Now try to subscribe using a HTTP framework without setting the
+  // 'force' field.
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+  call.mutable_framework_id()->CopyFrom(frameworkId.get());
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+  subscribe->mutable_framework_info()->mutable_id()->
+    CopyFrom(frameworkId.get());
+
+  // Retrieve the parameter passed as content type to this test.
+  const std::string contentType = GetParam();
+  hashmap<string, string> headers;
+  headers["Accept"] = contentType;
+
+  Future<Response> response = process::http::streaming::post(
+      master.get(),
+      "api/v1/scheduler",
+      headers,
+      serialize(call, contentType),
+      contentType);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(&HttpApiTest::deserialize, this, contentType, lambda::_1);
+
+  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // We should be receiving an error event since the PID framework
+  // was already connected.
+  ASSERT_EQ(Event::ERROR, event.get().get().type());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to