This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 0f25028 Added a mock master API subscriber for testing.
0f25028 is described below
commit 0f2502883925f36775d1bf6913c80ea1b1bc9656
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Sun Jun 9 22:21:12 2019 -0400
Added a mock master API subscriber for testing.
This patch introduces a class with mock methods for subscribing to the
events of master's V1 streaming API and setting expectations for them.
Review: https://reviews.apache.org/r/70671/
---
src/Makefile.am | 2 +
src/tests/CMakeLists.txt | 1 +
src/tests/master/mock_master_api_subscriber.cpp | 249 ++++++++++++++++++++++++
src/tests/master/mock_master_api_subscriber.hpp | 100 ++++++++++
4 files changed, 352 insertions(+)
diff --git a/src/Makefile.am b/src/Makefile.am
index 93b2606..a367bf7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2602,6 +2602,8 @@ mesos_tests_SOURCES =
\
tests/master_quota_tests.cpp \
tests/master_slave_reconciliation_tests.cpp \
tests/master_tests.cpp \
+ tests/master/mock_master_api_subscriber.cpp \
+ tests/master/mock_master_api_subscriber.hpp \
tests/master_validation_tests.cpp \
tests/mesos.cpp \
tests/mesos.hpp \
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index e6b1d8a..830942f 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -48,6 +48,7 @@ set(MESOS_TESTS_UTILS_SRC
flags.cpp
http_server_test_helper.cpp
main.cpp
+ master/mock_master_api_subscriber.cpp
mesos.cpp
mock_csi_plugin.cpp
mock_docker.cpp
diff --git a/src/tests/master/mock_master_api_subscriber.cpp
b/src/tests/master/mock_master_api_subscriber.cpp
new file mode 100644
index 0000000..a1cd538
--- /dev/null
+++ b/src/tests/master/mock_master_api_subscriber.cpp
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with *this work for additional information
+// regarding copyright ownership. The ASF licenses *this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use *this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include <memory>
+
+#include <process/loop.hpp>
+
+#include "tests/mesos.hpp"
+
+#include "tests/master/mock_master_api_subscriber.hpp"
+
+using mesos::v1::master::Call;
+using mesos::v1::master::Event;
+
+using mesos::internal::master::Master;
+using mesos::internal::recordio::Reader;
+
+using process::Future;
+using process::Failure;
+using process::Promise;
+
+using testing::_;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class MockMasterAPISubscriberProcess
+ : public process::Process<MockMasterAPISubscriberProcess>
+{
+public:
+ MockMasterAPISubscriberProcess(MockMasterAPISubscriber* subscriber_)
+ : subscriber(subscriber_) {};
+
+ Future<Nothing> subscribe(
+ const process::PID<Master>& pid, ContentType contentType)
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ process::http::Headers headers =
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ return process::http::streaming::post(
+ pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType))
+ .then(defer(self(), &Self::_subscribe, lambda::_1, contentType));
+ }
+
+protected:
+ void finalize() override
+ {
+ receiveLoop.discard();
+ }
+
+private:
+ Future<Nothing> _subscribe(
+ const process::Future<process::http::Response>& response,
+ ContentType contentType)
+ {
+ if (response.isFailed()) {
+ return Failure(
+ "Failed to subscribe to master API events: " + response.failure());
+ }
+
+ if (process::http::OK().status != response->status) {
+ return Failure(
+ "SUBSCRIBE call returned bad HTTP status code: " +
+ stringify(response->status));
+ }
+
+ if (response->type != process::http::Response::PIPE) {
+ return Failure("Response type is not PIPE");
+ }
+
+ if (response->reader.isNone()) {
+ return Failure("Response reader is set to None");
+ }
+
+ auto deserializer = lambda::bind(
+ deserialize<Event>, contentType, lambda::_1);
+
+ std::unique_ptr<Reader<Event>> reader(new Reader<Event>(
+ ::recordio::Decoder<Event>(deserializer), response->reader.get()));
+
+ auto decode = lambda::bind(
+ [](std::unique_ptr<Reader<Event>>& d) { return d->read(); },
+ std::move(reader));
+
+ receiveLoop = process::loop(
+ self(),
+ std::move(decode),
+ [this](const Result<Event>& event) -> process::ControlFlow<Nothing> {
+ if (event.isError()) {
+ LOG(FATAL) << "Failed to read master streaming API event: "
+ << event.error();
+ }
+
+ if (event.isNone()) {
+ LOG(INFO) << "Received EOF from master streaming API";
+ return process::Break();
+ }
+
+ LOG(INFO) << "Received " << event->type()
+ << " event from master streaming API";
+
+ subscriber->handleEvent(event.get());
+ return process::Continue();
+ });
+
+ LOG(INFO) << "Subscribed to master streaming API events";
+
+ receiveLoop.onAny([]() {
+ LOG(INFO) << "Stopped master streaming API receive loop";
+ });
+
+ return Nothing();
+ }
+
+ MockMasterAPISubscriber* subscriber;
+ Future<Nothing> receiveLoop;
+};
+
+
+MockMasterAPISubscriber::MockMasterAPISubscriber()
+{
+ EXPECT_CALL(*this, subscribed(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, taskAdded(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, taskUpdated(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, agentAdded(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, agentRemoved(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, frameworkAdded(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, frameworkUpdated(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, frameworkRemoved(_))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*this, heartbeat())
+ .WillRepeatedly(Return());
+
+ subscribeCalled = false;
+
+ process = new MockMasterAPISubscriberProcess(this);
+ spawn(process, true);
+}
+
+
+MockMasterAPISubscriber::~MockMasterAPISubscriber()
+{
+ process::terminate(process);
+
+ // The process holds a pointer to this object, and so
+ // we must ensure it won't access the pointer before
+ // we exit the destructor.
+ //
+ // TODO(asekretenko): Figure out a way to avoid blocking.
+ process::wait(process);
+}
+
+
+Future<Nothing> MockMasterAPISubscriber::subscribe(
+ const process::PID<Master>& pid,
+ ContentType contentType)
+{
+ if (subscribeCalled) {
+ return Failure(
+ "MockMasterAPISubscriber::subscribe should be called at most once");
+ }
+
+ subscribeCalled = true;
+
+ return dispatch(
+ process,
+ &MockMasterAPISubscriberProcess::subscribe,
+ pid,
+ contentType);
+}
+
+
+void MockMasterAPISubscriber::handleEvent(const Event& event)
+{
+ switch (event.type()) {
+ case Event::SUBSCRIBED:
+ subscribed(event.subscribed());
+ break;
+ case Event::TASK_ADDED:
+ taskAdded(event.task_added());
+ break;
+ case Event::TASK_UPDATED:
+ taskUpdated(event.task_updated());
+ break;
+ case Event::AGENT_ADDED:
+ agentAdded(event.agent_added());
+ break;
+ case Event::AGENT_REMOVED:
+ agentRemoved(event.agent_removed());
+ break;
+ case Event::FRAMEWORK_ADDED:
+ frameworkAdded(event.framework_added());
+ break;
+ case Event::FRAMEWORK_UPDATED:
+ frameworkUpdated(event.framework_updated());
+ break;
+ case Event::FRAMEWORK_REMOVED:
+ frameworkRemoved(event.framework_removed());
+ break;
+ case Event::HEARTBEAT:
+ heartbeat();
+ break;
+ case Event::UNKNOWN:
+ LOG(FATAL) << "Received event of a type UNKNOWN";
+ }
+}
+
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
diff --git a/src/tests/master/mock_master_api_subscriber.hpp
b/src/tests/master/mock_master_api_subscriber.hpp
new file mode 100644
index 0000000..dde176a
--- /dev/null
+++ b/src/tests/master/mock_master_api_subscriber.hpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __TESTS_MASTER_MOCK_API_SUBSCRIBER__
+#define __TESTS_MASTER_MOCK_API_SUBSCRIBER__
+
+#include <memory>
+
+#include <gmock/gmock.h>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <mesos/v1/master/master.hpp>
+#include <mesos/http.hpp>
+
+#include "master/master.hpp"
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class MockMasterAPISubscriberProcess;
+
+// This class performs subscribing to master's V1 API events
+// and provides mock methods for setting expectations for these events
+// similarly to MockHTTPScheduler.
+class MockMasterAPISubscriber
+{
+public:
+ // Mock methods which are called by this class when an event is received.
+ MOCK_METHOD1(subscribed, void(const
::mesos::v1::master::Event::Subscribed&));
+
+ MOCK_METHOD1(taskAdded, void(const ::mesos::v1::master::Event::TaskAdded&));
+
+ MOCK_METHOD1(taskUpdated,
+ void(const ::mesos::v1::master::Event::TaskUpdated&));
+
+ MOCK_METHOD1(agentAdded,
+ void(const ::mesos::v1::master::Event::AgentAdded&));
+
+ MOCK_METHOD1(agentRemoved,
+ void(const ::mesos::v1::master::Event::AgentRemoved&));
+
+ MOCK_METHOD1(frameworkAdded,
+ void(const ::mesos::v1::master::Event::FrameworkAdded&));
+
+ MOCK_METHOD1(frameworkUpdated,
+ void(const ::mesos::v1::master::Event::FrameworkUpdated&));
+
+ MOCK_METHOD1(frameworkRemoved,
+ void(const ::mesos::v1::master::Event::FrameworkRemoved&));
+
+ MOCK_METHOD0(heartbeat, void());
+
+ MockMasterAPISubscriber();
+ virtual ~MockMasterAPISubscriber();
+
+ // Subscribes to the master's V1 API event stream.
+ // The returned Future becomes ready after the SUBSCRIBE call returns a
+ // 200 OK response.
+ //
+ // NOTE: This method should be called at most once per the lifetime of the
+ // mock.
+ //
+ // NOTE: All expectations on the mock methods should be set before calling
+ // this method.
+ process::Future<Nothing> subscribe(
+ const process::PID<mesos::internal::master::Master>&,
+ ContentType contentType = ContentType::PROTOBUF);
+
+private:
+ friend class MockMasterAPISubscriberProcess;
+ void handleEvent(const ::mesos::v1::master::Event& event);
+
+ bool subscribeCalled;
+ MockMasterAPISubscriberProcess* process;
+};
+
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __TESTS_MASTER_MOCK_API_SUBSCRIBER__