This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f25028  Added a mock master API subscriber for testing.
0f25028 is described below

commit 0f2502883925f36775d1bf6913c80ea1b1bc9656
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Sun Jun 9 22:21:12 2019 -0400

    Added a mock master API subscriber for testing.
    
    This patch introduces a class with mock methods for subscribing to the
    events of master's V1 streaming API and setting expectations for them.
    
    Review: https://reviews.apache.org/r/70671/
---
 src/Makefile.am                                 |   2 +
 src/tests/CMakeLists.txt                        |   1 +
 src/tests/master/mock_master_api_subscriber.cpp | 249 ++++++++++++++++++++++++
 src/tests/master/mock_master_api_subscriber.hpp | 100 ++++++++++
 4 files changed, 352 insertions(+)

diff --git a/src/Makefile.am b/src/Makefile.am
index 93b2606..a367bf7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2602,6 +2602,8 @@ mesos_tests_SOURCES =                                     
        \
   tests/master_quota_tests.cpp                                 \
   tests/master_slave_reconciliation_tests.cpp                  \
   tests/master_tests.cpp                                       \
+  tests/master/mock_master_api_subscriber.cpp                  \
+  tests/master/mock_master_api_subscriber.hpp                  \
   tests/master_validation_tests.cpp                            \
   tests/mesos.cpp                                              \
   tests/mesos.hpp                                              \
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index e6b1d8a..830942f 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -48,6 +48,7 @@ set(MESOS_TESTS_UTILS_SRC
   flags.cpp
   http_server_test_helper.cpp
   main.cpp
+  master/mock_master_api_subscriber.cpp
   mesos.cpp
   mock_csi_plugin.cpp
   mock_docker.cpp
diff --git a/src/tests/master/mock_master_api_subscriber.cpp 
b/src/tests/master/mock_master_api_subscriber.cpp
new file mode 100644
index 0000000..a1cd538
--- /dev/null
+++ b/src/tests/master/mock_master_api_subscriber.cpp
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with *this work for additional information
+// regarding copyright ownership.  The ASF licenses *this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use *this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include <memory>
+
+#include <process/loop.hpp>
+
+#include "tests/mesos.hpp"
+
+#include "tests/master/mock_master_api_subscriber.hpp"
+
+using mesos::v1::master::Call;
+using mesos::v1::master::Event;
+
+using mesos::internal::master::Master;
+using mesos::internal::recordio::Reader;
+
+using process::Future;
+using process::Failure;
+using process::Promise;
+
+using testing::_;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class MockMasterAPISubscriberProcess
+  : public process::Process<MockMasterAPISubscriberProcess>
+{
+public:
+  MockMasterAPISubscriberProcess(MockMasterAPISubscriber* subscriber_)
+    : subscriber(subscriber_) {};
+
+  Future<Nothing> subscribe(
+    const process::PID<Master>& pid, ContentType contentType)
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    return process::http::streaming::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType))
+      .then(defer(self(), &Self::_subscribe, lambda::_1, contentType));
+  }
+
+protected:
+  void finalize() override
+  {
+    receiveLoop.discard();
+  }
+
+private:
+  Future<Nothing> _subscribe(
+    const process::Future<process::http::Response>& response,
+    ContentType contentType)
+  {
+    if (response.isFailed()) {
+      return Failure(
+        "Failed to subscribe to master API events: " + response.failure());
+    }
+
+    if (process::http::OK().status != response->status) {
+      return Failure(
+        "SUBSCRIBE call returned bad HTTP status code: " +
+        stringify(response->status));
+    }
+
+    if (response->type != process::http::Response::PIPE) {
+      return Failure("Response type is not PIPE");
+    }
+
+    if (response->reader.isNone()) {
+      return Failure("Response reader is set to None");
+    }
+
+    auto deserializer = lambda::bind(
+        deserialize<Event>, contentType, lambda::_1);
+
+    std::unique_ptr<Reader<Event>> reader(new Reader<Event>(
+        ::recordio::Decoder<Event>(deserializer), response->reader.get()));
+
+    auto decode = lambda::bind(
+        [](std::unique_ptr<Reader<Event>>& d) { return d->read(); },
+        std::move(reader));
+
+    receiveLoop = process::loop(
+        self(),
+        std::move(decode),
+        [this](const Result<Event>& event) -> process::ControlFlow<Nothing> {
+          if (event.isError()) {
+            LOG(FATAL) << "Failed to read master streaming API event: "
+                       << event.error();
+          }
+
+          if (event.isNone()) {
+            LOG(INFO) << "Received EOF from master streaming API";
+            return process::Break();
+          }
+
+          LOG(INFO) << "Received " << event->type()
+                    << " event from master streaming API";
+
+          subscriber->handleEvent(event.get());
+          return process::Continue();
+        });
+
+    LOG(INFO) << "Subscribed to master streaming API events";
+
+    receiveLoop.onAny([]() {
+      LOG(INFO) << "Stopped master streaming API receive loop";
+    });
+
+    return Nothing();
+  }
+
+  MockMasterAPISubscriber* subscriber;
+  Future<Nothing> receiveLoop;
+};
+
+
+MockMasterAPISubscriber::MockMasterAPISubscriber()
+{
+  EXPECT_CALL(*this, subscribed(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, taskAdded(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, taskUpdated(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, agentAdded(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, agentRemoved(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, frameworkAdded(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, frameworkUpdated(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, frameworkRemoved(_))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*this, heartbeat())
+    .WillRepeatedly(Return());
+
+  subscribeCalled = false;
+
+  process = new MockMasterAPISubscriberProcess(this);
+  spawn(process, true);
+}
+
+
+MockMasterAPISubscriber::~MockMasterAPISubscriber()
+{
+  process::terminate(process);
+
+  // The process holds a pointer to this object, and so
+  // we must ensure it won't access the pointer before
+  // we exit the destructor.
+  //
+  // TODO(asekretenko): Figure out a way to avoid blocking.
+  process::wait(process);
+}
+
+
+Future<Nothing> MockMasterAPISubscriber::subscribe(
+    const process::PID<Master>& pid,
+    ContentType contentType)
+{
+  if (subscribeCalled) {
+    return Failure(
+      "MockMasterAPISubscriber::subscribe should be called at most once");
+  }
+
+  subscribeCalled = true;
+
+  return dispatch(
+      process,
+      &MockMasterAPISubscriberProcess::subscribe,
+      pid,
+      contentType);
+}
+
+
+void MockMasterAPISubscriber::handleEvent(const Event& event)
+{
+  switch (event.type()) {
+    case Event::SUBSCRIBED:
+      subscribed(event.subscribed());
+      break;
+    case Event::TASK_ADDED:
+      taskAdded(event.task_added());
+      break;
+    case Event::TASK_UPDATED:
+      taskUpdated(event.task_updated());
+      break;
+    case Event::AGENT_ADDED:
+      agentAdded(event.agent_added());
+      break;
+    case Event::AGENT_REMOVED:
+      agentRemoved(event.agent_removed());
+      break;
+    case Event::FRAMEWORK_ADDED:
+      frameworkAdded(event.framework_added());
+      break;
+    case Event::FRAMEWORK_UPDATED:
+      frameworkUpdated(event.framework_updated());
+      break;
+    case Event::FRAMEWORK_REMOVED:
+      frameworkRemoved(event.framework_removed());
+      break;
+    case Event::HEARTBEAT:
+      heartbeat();
+      break;
+    case Event::UNKNOWN:
+      LOG(FATAL) << "Received event of a type UNKNOWN";
+  }
+}
+
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
diff --git a/src/tests/master/mock_master_api_subscriber.hpp 
b/src/tests/master/mock_master_api_subscriber.hpp
new file mode 100644
index 0000000..dde176a
--- /dev/null
+++ b/src/tests/master/mock_master_api_subscriber.hpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __TESTS_MASTER_MOCK_API_SUBSCRIBER__
+#define __TESTS_MASTER_MOCK_API_SUBSCRIBER__
+
+#include <memory>
+
+#include <gmock/gmock.h>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <mesos/v1/master/master.hpp>
+#include <mesos/http.hpp>
+
+#include "master/master.hpp"
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class MockMasterAPISubscriberProcess;
+
+// This class performs subscribing to master's V1 API events
+// and provides mock methods for setting expectations for these events
+// similarly to MockHTTPScheduler.
+class MockMasterAPISubscriber
+{
+public:
+  // Mock methods which are called by this class when an event is received.
+  MOCK_METHOD1(subscribed, void(const 
::mesos::v1::master::Event::Subscribed&));
+
+  MOCK_METHOD1(taskAdded, void(const ::mesos::v1::master::Event::TaskAdded&));
+
+  MOCK_METHOD1(taskUpdated,
+               void(const ::mesos::v1::master::Event::TaskUpdated&));
+
+  MOCK_METHOD1(agentAdded,
+               void(const ::mesos::v1::master::Event::AgentAdded&));
+
+  MOCK_METHOD1(agentRemoved,
+               void(const ::mesos::v1::master::Event::AgentRemoved&));
+
+  MOCK_METHOD1(frameworkAdded,
+               void(const ::mesos::v1::master::Event::FrameworkAdded&));
+
+  MOCK_METHOD1(frameworkUpdated,
+               void(const ::mesos::v1::master::Event::FrameworkUpdated&));
+
+  MOCK_METHOD1(frameworkRemoved,
+               void(const ::mesos::v1::master::Event::FrameworkRemoved&));
+
+  MOCK_METHOD0(heartbeat, void());
+
+  MockMasterAPISubscriber();
+  virtual ~MockMasterAPISubscriber();
+
+  // Subscribes to the master's V1 API event stream.
+  // The returned Future becomes ready after the SUBSCRIBE call returns a
+  // 200 OK response.
+  //
+  // NOTE: This method should be called at most once per the lifetime of the
+  // mock.
+  //
+  // NOTE: All expectations on the mock methods should be set before calling
+  // this method.
+  process::Future<Nothing> subscribe(
+    const process::PID<mesos::internal::master::Master>&,
+    ContentType contentType = ContentType::PROTOBUF);
+
+private:
+  friend class MockMasterAPISubscriberProcess;
+  void handleEvent(const ::mesos::v1::master::Event& event);
+
+  bool subscribeCalled;
+  MockMasterAPISubscriberProcess* process;
+};
+
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif //  __TESTS_MASTER_MOCK_API_SUBSCRIBER__

Reply via email to