This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 486646adf38ee1ed206d0d35331aa4263c952565 Author: Andrei Sekretenko <[email protected]> AuthorDate: Fri Oct 4 18:15:09 2019 -0400 Added tests of metrics for tracking quota consumption. Review: https://reviews.apache.org/r/71491/ --- src/Makefile.am | 1 + src/tests/CMakeLists.txt | 1 + src/tests/consumption_metrics_tests.cpp | 412 ++++++++++++++++++++++++++++++++ 3 files changed, 414 insertions(+) diff --git a/src/Makefile.am b/src/Makefile.am index eff0127..0e57b28 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2622,6 +2622,7 @@ mesos_tests_SOURCES = \ tests/cluster.hpp \ tests/command_executor_tests.cpp \ tests/common_validation_tests.cpp \ + tests/consumption_metrics_tests.cpp \ tests/container_daemon_tests.cpp \ tests/container_logger_tests.cpp \ tests/containerizer.cpp \ diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 1e53b39..0a3c176 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -88,6 +88,7 @@ set(MESOS_TESTS_SRC check_tests.cpp command_executor_tests.cpp common_validation_tests.cpp + consumption_metrics_tests.cpp container_daemon_tests.cpp cram_md5_authentication_tests.cpp credentials_tests.cpp diff --git a/src/tests/consumption_metrics_tests.cpp b/src/tests/consumption_metrics_tests.cpp new file mode 100644 index 0000000..6516aa2 --- /dev/null +++ b/src/tests/consumption_metrics_tests.cpp @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <string> + +#include <gmock/gmock.h> + +#include <process/clock.hpp> +#include <process/future.hpp> +#include <process/gmock.hpp> +#include <process/owned.hpp> +#include <process/pid.hpp> + +#include <process/metrics/metrics.hpp> + +#include <stout/json.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> + +#include "master/detector/standalone.hpp" + +#include "slave/containerizer/mesos/containerizer.hpp" + +#include "tests/containerizer.hpp" +#include "tests/mesos.hpp" +#include "tests/master/mock_master_api_subscriber.hpp" + +using mesos::internal::master::Master; +using mesos::master::detector::StandaloneMasterDetector; + +using mesos::v1::scheduler::Call; +using mesos::v1::scheduler::Event; + +using mesos::internal::tests::v1::MockMasterAPISubscriber; + +using process::Clock; +using process::Future; +using process::Owned; + +using std::string; + +using testing::_; +using testing::Return; +using testing::DoAll; + + +namespace mesos { +namespace internal { +namespace tests { + +// Tests for metrics tracking quota consumption. +// +// TODO(asekretenko): Add more tests: +// - ensure hierarchial tracking +// - ensure that shared resource accounting is correct +// - ... +class ConsumptionMetricsTest : public MesosTest {}; + +// This test ensures that quota consumption of a launched task +// is tracked correctly in the allocator. +// +// The quota consumption metric is expected: +// - to be absent after the scheduler has subscribed +// - to be still absent after the scheduler receives an offer +// - to have the correct value after task launch +// - to be absent after the task terminates +TEST_F(ConsumptionMetricsTest, Launch) +{ + const string cpuConsumedKey = "allocator/mesos/quota/roles/" + + v1::DEFAULT_FRAMEWORK_INFO.roles(0) + + "/resources/cpus/consumed"; + + const master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + StandaloneMasterDetector detector(master.get()->pid); + + // Create a scheduler and set expectations on it. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); + + Future<Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + // Subscribe the framework. + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + // There should be no consumption metric at this point. + Clock::pause(); + Clock::settle(); + EXPECT_NONE(Metrics().at<JSON::Number>(cpuConsumedKey)); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + // There should still be no consumption at this point, + // as the offer has not been accepted yet. + Clock::settle(); + EXPECT_NONE(Metrics().at<JSON::Number>(cpuConsumedKey)); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + // Acknowledge task status updates. + // We will also need to wait for TASK_RIUNNING and TASK_KILLED. + const auto sendAcknowledge = + v1::scheduler::SendAcknowledge(frameworkId, agentId); + + EXPECT_CALL(*scheduler, update(_, _)) + .WillRepeatedly(sendAcknowledge); + + Future<Event::Update> taskRunning; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .WillOnce(DoAll(FutureArg<1>(&taskRunning), sendAcknowledge)) + .WillRepeatedly(sendAcknowledge); + + Future<Event::Update> taskKilled; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_KILLED))) + .WillOnce(DoAll(FutureArg<1>(&taskKilled), sendAcknowledge)) + .WillRepeatedly(sendAcknowledge); + + // Launch a task. + const TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(100000)); + mesos.send( + v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})})); + + AWAIT_READY(taskRunning); + Clock::settle(); + + // Allocator should now report the resources as quota consumption. + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); + + // Kill the task and wait for update. + mesos.send(v1::createCallKill(frameworkId, evolve(task.task_id()), agentId)); + AWAIT_READY(taskKilled); + + // There should be no consumption metric after the terminal task + // status update has been processed by the master and the allocator. + Clock::settle(); + EXPECT_NONE(Metrics().at<JSON::Number>(cpuConsumedKey)); +} + + +// This test ensures that reservation is counted as consuming role's quota +// regardless of whether it has been allocated or not. +TEST_F(ConsumptionMetricsTest, ReservedResource) +{ + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, "role1"); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = "cpus(role1):2;mem(role1):1024"; + + const string cpuConsumedKey = "allocator/mesos/quota/roles/" + + frameworkInfo.roles(0) + + "/resources/cpus/consumed"; + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + StandaloneMasterDetector detector(master.get()->pid); + + // Create a scheduler and set expectations on it. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); + + Future<Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + // Subscribe the framework. + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + // Create a slave with reserved resources. + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + // Test that reservation is consuming role's quota. + Clock::pause(); + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + // Acknowledge task status updates. + // We will also need to wait for TASK_RIUNNING and TASK_KILLED. + const auto sendAcknowledge = + v1::scheduler::SendAcknowledge(frameworkId, agentId); + + EXPECT_CALL(*scheduler, update(_, _)) + .WillRepeatedly(sendAcknowledge); + + Future<Event::Update> taskRunning; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .WillOnce(DoAll(FutureArg<1>(&taskRunning), sendAcknowledge)) + .WillRepeatedly(sendAcknowledge); + + Future<Event::Update> taskKilled; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_KILLED))) + .WillOnce(DoAll(FutureArg<1>(&taskKilled), sendAcknowledge)) + .WillRepeatedly(sendAcknowledge); + + // Launch a task. + const TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(100000)); + mesos.send( + v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})})); + + AWAIT_READY(taskRunning); + Clock::settle(); + + // No double accounting should occur. + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); + + // Kill the task and wait for update. + mesos.send(v1::createCallKill(frameworkId, evolve(task.task_id()), agentId)); + AWAIT_READY(taskKilled); + + // The quota consumption should remain the same. + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); +} + + +// This test ensures that quota consumption in allocator +// is tracked correctly during and after master failover. +// +// The quota consumption metric is expected to have the correct value: +// - after task launch +// - after agent re-registration on failover before the framework resubscribes +// - after the framework re-subscribes too +TEST_F(ConsumptionMetricsTest, MasterFailover) +{ + const string cpuConsumedKey = "allocator/mesos/quota/roles/" + + v1::DEFAULT_FRAMEWORK_INFO.roles(0) + + "/resources/cpus/consumed"; + + const master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + StandaloneMasterDetector detector(master.get()->pid); + + // Create a scheduler and set expectations on it. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> reconnected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)) + .WillOnce(FutureSatisfy(&reconnected)); + + Future<Event::Subscribed> subscribed; + Future<Nothing> resubscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)) + .WillOnce(FutureSatisfy(&resubscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); + + Future<Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + Future<Nothing> disconnected; + EXPECT_CALL(*scheduler, disconnected(_)) + .WillOnce(FutureSatisfy(&disconnected)); + + // Subscribe the framework. + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + // Acknowledge task status updates. + // We will also need to wait for TASK_RUNNING. + const auto sendAcknowledge = + v1::scheduler::SendAcknowledge(frameworkId, agentId); + + EXPECT_CALL(*scheduler, update(_, _)) + .WillRepeatedly(sendAcknowledge); + + Future<Event::Update> taskRunning; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .WillOnce(DoAll(FutureArg<1>(&taskRunning), sendAcknowledge)) + .WillRepeatedly(sendAcknowledge); + + // Launch a task. + const TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(100000)); + mesos.send( + v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})})); + + AWAIT_READY(taskRunning); + + // If due to some bug the resources are not reported as quota consumption, + // it makes no sense to continue. + Clock::pause(); + Clock::settle(); + ASSERT_EQ(2, Metrics().values[cpuConsumedKey]); + + // Failover the master + Clock::resume(); + detector.appoint(None()); + master->reset(); + AWAIT_READY(disconnected); + + master = StartMaster(); + ASSERT_SOME(master); + + // Subscribe to master API to wait for the agent to re-register. + MockMasterAPISubscriber masterSubscriber; + Future<Nothing> agentReAdded; + EXPECT_CALL(masterSubscriber, agentAdded(_)) + .WillOnce(FutureSatisfy(&agentReAdded)); + + AWAIT_READY(masterSubscriber.subscribe(master.get()->pid)); + + // Have the agent reregister with the new master. + detector.appoint(master.get()->pid); + AWAIT_READY(agentReAdded); + + // After agent re-registers, allocator should still report the resources + // as quota consumption despite the fact that the framework is disconnected. + Clock::pause(); + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); + + // Now we can proceed with reconnecting. + AWAIT_READY(reconnected); + + // Resubscribe the framework. + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + *frameworkInfo.mutable_id() = frameworkId; + mesos.send(v1::createCallSubscribe(frameworkInfo, frameworkId)); + AWAIT_READY(resubscribed); + + // Ensure that no double accounting occurs after re-subscription. + Clock::settle(); + EXPECT_EQ(2, Metrics().values[cpuConsumedKey]); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos {
