Repository: mesos Updated Branches: refs/heads/master 1b807ab14 -> 4e86a8c1c
Migrate /monitor/statistics and /monitor/statistics.json to slave. These two endpoints and their underlying logics are moved from ResourceMonitorProcess to slave process. ResourceMonitor is removed. Review: https://reviews.apache.org/r/45381/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e86a8c1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e86a8c1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e86a8c1 Branch: refs/heads/master Commit: 4e86a8c1cc0c10ce53a5b79c13114143da9b0498 Parents: 28e5c1c Author: Jay Guo <guojian...@cn.ibm.com> Authored: Wed Apr 6 09:29:23 2016 -0700 Committer: Jie Yu <yujie....@gmail.com> Committed: Wed Apr 6 11:14:46 2016 -0700 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 - src/Makefile.am | 3 - src/slave/http.cpp | 72 +++++++++ src/slave/monitor.cpp | 175 -------------------- src/slave/monitor.hpp | 52 ------ src/slave/slave.cpp | 11 +- src/slave/slave.hpp | 19 ++- src/tests/limiter.hpp | 2 +- src/tests/mesos.cpp | 8 + src/tests/mesos.hpp | 4 + src/tests/monitor_tests.cpp | 255 ------------------------------ src/tests/oversubscription_tests.cpp | 12 +- src/tests/slave_tests.cpp | 215 +++++++++++++++++++++++++ 13 files changed, 328 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ff225c0..9a4cffa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -246,7 +246,6 @@ set(AGENT_SRC slave/gc.cpp slave/http.cpp slave/metrics.cpp - slave/monitor.cpp slave/paths.cpp slave/qos_controller.cpp slave/qos_controllers/noop.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 55d3b34..ba9cc8b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -652,7 +652,6 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/gc.cpp \ slave/http.cpp \ slave/metrics.cpp \ - slave/monitor.cpp \ slave/paths.cpp \ slave/qos_controller.cpp \ slave/qos_controllers/noop.cpp \ @@ -767,7 +766,6 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/flags.hpp \ slave/gc.hpp \ slave/metrics.hpp \ - slave/monitor.hpp \ slave/paths.hpp \ slave/slave.hpp \ slave/state.hpp \ @@ -1887,7 +1885,6 @@ mesos_tests_SOURCES = \ tests/metrics_tests.cpp \ tests/module.cpp \ tests/module_tests.cpp \ - tests/monitor_tests.cpp \ tests/oversubscription_tests.cpp \ tests/partition_tests.cpp \ tests/paths_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 7a872c6..a684ff5 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -27,6 +27,7 @@ #include <process/help.hpp> #include <process/owned.hpp> +#include <process/limiter.hpp> #include <process/metrics/metrics.hpp> @@ -548,6 +549,77 @@ Future<Response> Slave::Http::state( return OK(jsonify(state), request.url.query.get("jsonp")); } + +string Slave::Http::STATISTICS_HELP() +{ + return HELP( + TLDR( + "Retrieve resource monitoring information."), + DESCRIPTION( + "Returns the current resource consumption data for containers", + "running under this slave.", + "", + "Example:", + "", + "```", + "[{", + " \"executor_id\":\"executor\",", + " \"executor_name\":\"name\",", + " \"framework_id\":\"framework\",", + " \"source\":\"source\",", + " \"statistics\":", + " {", + " \"cpus_limit\":8.25,", + " \"cpus_nr_periods\":769021,", + " \"cpus_nr_throttled\":1046,", + " \"cpus_system_time_secs\":34501.45,", + " \"cpus_throttled_time_secs\":352.597023453,", + " \"cpus_user_time_secs\":96348.84,", + " \"mem_anon_bytes\":4845449216,", + " \"mem_file_bytes\":260165632,", + " \"mem_limit_bytes\":7650410496,", + " \"mem_mapped_file_bytes\":7159808,", + " \"mem_rss_bytes\":5105614848,", + " \"timestamp\":1388534400.0", + " }", + "}]", + "```")); +} + + +Future<Response> Slave::Http::statistics(const Request& request) const +{ + return statisticsLimiter->acquire() + .then(defer(slave->self(), &Slave::usage)) + .then([=](const Future<ResourceUsage>& usage) -> Future<Response> { + JSON::Array result; + + foreach (const ResourceUsage::Executor& executor, + usage.get().executors()) { + if (executor.has_statistics()) { + const ExecutorInfo info = executor.executor_info(); + + JSON::Object entry; + entry.values["framework_id"] = info.framework_id().value(); + entry.values["executor_id"] = info.executor_id().value(); + entry.values["executor_name"] = info.name(); + entry.values["source"] = info.source(); + entry.values["statistics"] = JSON::protobuf(executor.statistics()); + + result.values.push_back(entry); + } + } + + return process::http::OK(result, request.url.query.get("jsonp")); + }) + .repair([](const Future<Response>& future) { + LOG(WARNING) << "Could not collect resource usage: " + << (future.isFailed() ? future.failure() : "discarded"); + + return process::http::InternalServerError(); + }); +} + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/slave/monitor.cpp ---------------------------------------------------------------------- diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp deleted file mode 100644 index 5c1dd35..0000000 --- a/src/slave/monitor.cpp +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <string> - -#include <glog/logging.h> - -#include <process/collect.hpp> -#include <process/defer.hpp> -#include <process/future.hpp> -#include <process/help.hpp> -#include <process/http.hpp> -#include <process/limiter.hpp> -#include <process/process.hpp> - -#include <stout/json.hpp> -#include <stout/lambda.hpp> -#include <stout/protobuf.hpp> - -#include "slave/monitor.hpp" - -using namespace process; - -using std::string; - -namespace mesos { -namespace internal { -namespace slave { - -static const string STATISTICS_HELP() -{ - return HELP( - TLDR( - "Retrieve resource monitoring information."), - DESCRIPTION( - "Returns the current resource consumption data for containers", - "running under this slave.", - "", - "Example:", - "", - "```", - "[{", - " \"executor_id\":\"executor\",", - " \"executor_name\":\"name\",", - " \"framework_id\":\"framework\",", - " \"source\":\"source\",", - " \"statistics\":", - " {", - " \"cpus_limit\":8.25,", - " \"cpus_nr_periods\":769021,", - " \"cpus_nr_throttled\":1046,", - " \"cpus_system_time_secs\":34501.45,", - " \"cpus_throttled_time_secs\":352.597023453,", - " \"cpus_user_time_secs\":96348.84,", - " \"mem_anon_bytes\":4845449216,", - " \"mem_file_bytes\":260165632,", - " \"mem_limit_bytes\":7650410496,", - " \"mem_mapped_file_bytes\":7159808,", - " \"mem_rss_bytes\":5105614848,", - " \"timestamp\":1388534400.0", - " }", - "}]", - "```")); -} - - -class ResourceMonitorProcess : public Process<ResourceMonitorProcess> -{ -public: - explicit ResourceMonitorProcess( - const lambda::function<Future<ResourceUsage>()>& _usage) - : ProcessBase("monitor"), - usage(_usage), - limiter(2, Seconds(1)) {} // 2 permits per second. - - virtual ~ResourceMonitorProcess() {} - -protected: - virtual void initialize() - { - // TODO(ijimenez): Remove this endpoint at the end of the - // deprecation cycle on 0.26. - route("/statistics.json", - STATISTICS_HELP(), - &ResourceMonitorProcess::statistics); - - route("/statistics", - STATISTICS_HELP(), - &ResourceMonitorProcess::statistics); - } - -private: - // Returns the monitoring statistics. Requests have no parameters. - Future<http::Response> statistics(const http::Request& request) - { - return limiter.acquire() - .then(defer(self(), &Self::_statistics, request)); - } - - Future<http::Response> _statistics(const http::Request& request) - { - return usage() - .then(defer(self(), &Self::__statistics, lambda::_1, request)); - } - - Future<http::Response> __statistics( - const Future<ResourceUsage>& future, - const http::Request& request) - { - if (!future.isReady()) { - LOG(WARNING) << "Could not collect resource usage: " - << (future.isFailed() ? future.failure() : "discarded"); - - return http::InternalServerError(); - } - - JSON::Array result; - - foreach (const ResourceUsage::Executor& executor, - future.get().executors()) { - if (executor.has_statistics()) { - const ExecutorInfo info = executor.executor_info(); - - JSON::Object entry; - entry.values["framework_id"] = info.framework_id().value(); - entry.values["executor_id"] = info.executor_id().value(); - entry.values["executor_name"] = info.name(); - entry.values["source"] = info.source(); - entry.values["statistics"] = JSON::protobuf(executor.statistics()); - - result.values.push_back(entry); - } - } - - return http::OK(result, request.url.query.get("jsonp")); - } - - // Callback used to retrieve resource usage information from slave. - const lambda::function<Future<ResourceUsage>()> usage; - - // Used to rate limit the statistics endpoint. - RateLimiter limiter; -}; - - -ResourceMonitor::ResourceMonitor( - const lambda::function<Future<ResourceUsage>()>& usage) - : process(new ResourceMonitorProcess(usage)) -{ - spawn(process.get()); -} - - -ResourceMonitor::~ResourceMonitor() -{ - terminate(process.get()); - wait(process.get()); -} - -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/slave/monitor.hpp ---------------------------------------------------------------------- diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp deleted file mode 100644 index 70a7c88..0000000 --- a/src/slave/monitor.hpp +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef __SLAVE_MONITOR_HPP__ -#define __SLAVE_MONITOR_HPP__ - -#include <mesos/mesos.hpp> - -#include <process/future.hpp> -#include <process/owned.hpp> - -#include <stout/lambda.hpp> - -namespace mesos { -namespace internal { -namespace slave { - -// Forward declarations. -class ResourceMonitorProcess; - - -// Exposes resources usage information via a JSON endpoint. -class ResourceMonitor -{ -public: - explicit ResourceMonitor( - const lambda::function<process::Future<ResourceUsage>()>& usage); - - ~ResourceMonitor(); - -private: - process::Owned<ResourceMonitorProcess> process; -}; - -} // namespace slave { -} // namespace internal { -} // namespace mesos { - -#endif // __SLAVE_MONITOR_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 60f93ca..cd4264e 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -141,7 +141,6 @@ Slave::Slave(const std::string& id, files(_files), metrics(*this), gc(_gc), - monitor(defer(self(), &Self::usage)), statusUpdateManager(_statusUpdateManager), masterPingTimeout(DEFAULT_MASTER_PING_TIMEOUT()), metaDir(paths::getMetaRootDir(flags.work_dir)), @@ -740,6 +739,16 @@ void Slave::initialize() [http](const process::http::Request& request) { return http.health(request); }); + route("/monitor/statistics", + Http::STATISTICS_HELP(), + [http](const process::http::Request& request) { + return http.statistics(request); + }); + route("/monitor/statistics.json", + Http::STATISTICS_HELP(), + [http](const process::http::Request& request) { + return http.statistics(request); + }); // Expose the log file for the webui. Fall back to 'log_dir' if // an explicit file was not specified. http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 3ba335f..5b6076a 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -42,8 +42,10 @@ #include <process/http.hpp> #include <process/future.hpp> #include <process/owned.hpp> +#include <process/limiter.hpp> #include <process/process.hpp> #include <process/protobuf.hpp> +#include <process/shared.hpp> #include <stout/bytes.hpp> #include <stout/linkedhashmap.hpp> @@ -71,7 +73,6 @@ #include "slave/flags.hpp" #include "slave/gc.hpp" #include "slave/metrics.hpp" -#include "slave/monitor.hpp" #include "slave/paths.hpp" #include "slave/state.hpp" @@ -396,7 +397,7 @@ public: mesos::slave::QoSCorrection>>& correction); // Returns the resource usage information for all executors. - process::Future<ResourceUsage> usage(); + virtual process::Future<ResourceUsage> usage(); // Handle the second phase of shutting down an executor for those // executors that have not properly shutdown within a timeout. @@ -421,7 +422,8 @@ private: class Http { public: - explicit Http(Slave* _slave) : slave(_slave) {} + explicit Http(Slave* _slave) + : slave(_slave), statisticsLimiter(new RateLimiter(2, Seconds(1))) {} // Logs the request, route handlers can compose this with the // desired request handler to get consistent request logging. @@ -445,13 +447,22 @@ private: const process::http::Request& request, const Option<std::string>& /* principal */) const; + // /slave/monitor/statistics + // /slave/monitor/statistics.json + process::Future<process::http::Response> statistics( + const process::http::Request& request) const; + static std::string EXECUTOR_HELP(); static std::string FLAGS_HELP(); static std::string HEALTH_HELP(); static std::string STATE_HELP(); + static std::string STATISTICS_HELP(); private: Slave* slave; + + // Used to rate limit the statistics endpoint. + Shared<RateLimiter> statisticsLimiter; }; friend struct Framework; @@ -532,8 +543,6 @@ private: GarbageCollector* gc; - ResourceMonitor monitor; - StatusUpdateManager* statusUpdateManager; // Master detection future. http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/limiter.hpp ---------------------------------------------------------------------- diff --git a/src/tests/limiter.hpp b/src/tests/limiter.hpp index baf396c..16d5682 100644 --- a/src/tests/limiter.hpp +++ b/src/tests/limiter.hpp @@ -34,7 +34,7 @@ public: // be non-zero, but this value has no effect since this is a mock. MockRateLimiter() : process::RateLimiter(1, Seconds(1)) {} - MOCK_METHOD0(acquire, process::Future<Nothing>()); + MOCK_CONST_METHOD0(acquire, process::Future<Nothing>()); }; } // namespace tests { http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index cf38dbb..803b925 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -478,6 +478,8 @@ MockSlave::MockSlave( .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover)); EXPECT_CALL(*this, qosCorrections()) .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections)); + EXPECT_CALL(*this, usage()) + .WillRepeatedly(Invoke(this, &MockSlave::unmocked_usage)); } @@ -534,6 +536,12 @@ void MockSlave::unmocked_qosCorrections() } +process::Future<ResourceUsage> MockSlave::unmocked_usage() +{ + return slave::Slave::usage(); +} + + MockFetcherProcess::MockFetcherProcess() { // Set up default behaviors, calling the original methods. http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 3b565b4..8c8cd1a 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1304,6 +1304,10 @@ public: const process::Future<std::list< mesos::slave::QoSCorrection>>& correction)); + MOCK_METHOD0(usage, process::Future<ResourceUsage>()); + + process::Future<ResourceUsage> unmocked_usage(); + private: Files files; MockGarbageCollector gc; http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/monitor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp deleted file mode 100644 index 5dcb248..0000000 --- a/src/tests/monitor_tests.cpp +++ /dev/null @@ -1,255 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <limits> -#include <vector> - -#include <gmock/gmock.h> - -#include <mesos/mesos.hpp> -#include <mesos/resources.hpp> - -#include <process/future.hpp> -#include <process/gmock.hpp> -#include <process/gtest.hpp> -#include <process/http.hpp> -#include <process/owned.hpp> -#include <process/pid.hpp> -#include <process/process.hpp> - -#include <stout/bytes.hpp> -#include <stout/json.hpp> -#include <stout/nothing.hpp> - -#include "slave/constants.hpp" -#include "slave/monitor.hpp" - -#include "tests/mesos.hpp" - -using namespace process; - -using mesos::internal::master::Master; - -using mesos::internal::slave::ResourceMonitor; -using mesos::internal::slave::Slave; - -using std::numeric_limits; -using std::vector; - -namespace mesos { -namespace internal { -namespace tests { - -TEST(MonitorTest, Statistics) -{ - FrameworkID frameworkId; - frameworkId.set_value("framework"); - - ExecutorID executorId; - executorId.set_value("executor"); - - ExecutorInfo executorInfo; - executorInfo.mutable_executor_id()->CopyFrom(executorId); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.set_name("name"); - executorInfo.set_source("source"); - - ResourceStatistics statistics; - statistics.set_cpus_nr_periods(100); - statistics.set_cpus_nr_throttled(2); - statistics.set_cpus_user_time_secs(4); - statistics.set_cpus_system_time_secs(1); - statistics.set_cpus_throttled_time_secs(0.5); - statistics.set_cpus_limit(1.0); - statistics.set_mem_file_bytes(0); - statistics.set_mem_anon_bytes(0); - statistics.set_mem_mapped_file_bytes(0); - statistics.set_mem_rss_bytes(1024); - statistics.set_mem_limit_bytes(2048); - statistics.set_timestamp(0); - - ResourceMonitor monitor([=]() -> Future<ResourceUsage> { - Resources resources = Resources::parse("cpus:1;mem:2").get(); - - ResourceUsage usage; - ResourceUsage::Executor* executor = usage.add_executors(); - executor->mutable_executor_info()->CopyFrom(executorInfo); - executor->mutable_allocated()->CopyFrom(resources); - executor->mutable_statistics()->CopyFrom(statistics); - - return usage; - }); - - UPID upid("monitor", process::address()); - - Future<http::Response> response = http::get(upid, "statistics"); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - - JSON::Array expected; - JSON::Object usage; - usage.values["executor_id"] = "executor"; - usage.values["executor_name"] = "name"; - usage.values["framework_id"] = "framework"; - usage.values["source"] = "source"; - usage.values["statistics"] = JSON::protobuf(statistics); - expected.values.push_back(usage); - - Try<JSON::Array> result = JSON::parse<JSON::Array>(response.get().body); - ASSERT_SOME(result); - ASSERT_EQ(expected, result.get()); -} - - -// This test verifies the correct handling of the statistics -// endpoint when there is no executor running. -TEST(MonitorTest, NoExecutor) -{ - ResourceMonitor monitor([]() -> Future<ResourceUsage> { - return ResourceUsage(); - }); - - UPID upid("monitor", process::address()); - - Future<http::Response> response = http::get(upid, "statistics"); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - AWAIT_EXPECT_RESPONSE_BODY_EQ("[]", response); -} - - -// This test verifies the correct handling of the statistics -// endpoint when statistics is missing in ResourceUsage. -TEST(MonitorTest, MissingStatistics) -{ - ResourceMonitor monitor([]() -> Future<ResourceUsage> { - FrameworkID frameworkId; - frameworkId.set_value("framework"); - - ExecutorID executorId; - executorId.set_value("executor"); - - ExecutorInfo executorInfo; - executorInfo.mutable_executor_id()->CopyFrom(executorId); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.set_name("name"); - executorInfo.set_source("source"); - - Resources resources = Resources::parse("cpus:1;mem:2").get(); - - ResourceUsage usage; - ResourceUsage::Executor* executor = usage.add_executors(); - executor->mutable_executor_info()->CopyFrom(executorInfo); - executor->mutable_allocated()->CopyFrom(resources); - - return usage; - }); - - UPID upid("monitor", process::address()); - - Future<http::Response> response = http::get(upid, "statistics"); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - AWAIT_EXPECT_RESPONSE_BODY_EQ("[]", response); -} - - -class MonitorIntegrationTest : public MesosTest {}; - - -// This is an end-to-end test that verifies that the slave returns the -// correct ResourceUsage based on the currently running executors, and -// the values returned by the statistics endpoint are as expected. -TEST_F(MonitorIntegrationTest, RunningExecutor) -{ - Try<Owned<cluster::Master>> master = StartMaster(); - ASSERT_SOME(master); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - EXPECT_FALSE(offers.get().empty()); - - const Offer& offer = offers.get()[0]; - - // Launch a task and wait until it is in RUNNING status. - TaskInfo task = createTask( - offer.slave_id(), - Resources::parse("cpus:1;mem:32").get(), - "sleep 1000"); - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - driver.launchTasks(offer.id(), {task}); - - AWAIT_READY(status); - EXPECT_EQ(task.task_id(), status.get().task_id()); - EXPECT_EQ(TASK_RUNNING, status.get().state()); - - // Hit the statistics endpoint and expect the response contains the - // resource statistics for the running container. - UPID upid("monitor", process::address()); - - Future<http::Response> response = http::get(upid, "statistics"); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - - // Verify that the statistics in the response contains the proper - // resource limits for the container. - Try<JSON::Value> value = JSON::parse(response.get().body); - ASSERT_SOME(value); - - Try<JSON::Value> expected = JSON::parse(strings::format( - "[{" - "\"statistics\":{" - "\"cpus_limit\":%g," - "\"mem_limit_bytes\":%lu" - "}" - "}]", - 1 + slave::DEFAULT_EXECUTOR_CPUS, - (Megabytes(32) + slave::DEFAULT_EXECUTOR_MEM).bytes()).get()); - - ASSERT_SOME(expected); - EXPECT_TRUE(value.get().contains(expected.get())); - - driver.stop(); - driver.join(); -} - -} // namespace tests { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index ba03681..84e1104 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -41,7 +41,6 @@ #include "module/manager.hpp" #include "slave/flags.hpp" -#include "slave/monitor.hpp" #include "slave/slave.hpp" #include "slave/qos_controllers/load.hpp" @@ -55,7 +54,6 @@ using namespace process; using mesos::internal::master::Master; using mesos::internal::slave::LoadQoSController; -using mesos::internal::slave::ResourceMonitor; using mesos::internal::slave::Slave; using mesos::slave::QoSCorrection; @@ -173,9 +171,8 @@ private: // This test verifies that the ResourceEstimator is able to fetch -// ResourceUsage statistics about running executor from -// the ResourceMonitor. -TEST_F(OversubscriptionTest, FetchResourceUsageFromMonitor) +// ResourceUsage statistics about running executor. +TEST_F(OversubscriptionTest, FetchResourceUsage) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); @@ -618,9 +615,8 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator) // This test verifies that the QoS Controller is able to fetch -// ResourceUsage statistics about running executor from -// the ResourceMonitor. -TEST_F(OversubscriptionTest, QoSFetchResourceUsageFromMonitor) +// ResourceUsage statistics about running executor. +TEST_F(OversubscriptionTest, QoSFetchResourceUsage) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/4e86a8c1/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 57fc503..c4d80aa 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -1599,6 +1599,221 @@ TEST_F(SlaveTest, HTTPEndpointsBadAuthentication) } +// This test verifies correct handling of statistics endpoint when +// there is no exeuctor running. +TEST_F(SlaveTest, StatisticsEndpointNoExecutor) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + Future<Response> response = process::http::get( + slave.get()->pid, + "/monitor/statistics", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + AWAIT_EXPECT_RESPONSE_BODY_EQ("[]", response); +} + + +// This test verifies the correct handling of the statistics +// endpoint when statistics is missing in ResourceUsage. +TEST_F(SlaveTest, StatisticsEndpointMissingStatistics) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + StandaloneMasterDetector detector(master.get()->pid); + + MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); + spawn(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + EXPECT_CALL(exec, registered(_, _, _, _)); + + Future<vector<Offer>> offers; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:0.1;mem:32").get(), + "sleep 1000", + exec.id); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // Set up the containerizer so the next usage() will fail. + EXPECT_CALL(containerizer, usage(_)) + .WillOnce(Return(Failure("Injected failure"))); + + Future<Response> response = process::http::get( + slave.self(), + "monitor/statistics", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_READY(response); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + AWAIT_EXPECT_RESPONSE_BODY_EQ("[]", response); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + terminate(slave); + wait(slave); +} + + +// This test verifies the correct response of /monitor/statistics endpoint +// when ResourceUsage collection fails. +TEST_F(SlaveTest, StatisticsEndpointGetResourceUsageFailed) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + StandaloneMasterDetector detector(master.get()->pid); + + MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); + + EXPECT_CALL(slave, usage()) + .WillOnce(Return(Failure("Resource Collection Failure"))); + + spawn(slave); + + Future<Response> response = process::http::get( + slave.self(), + "monitor/statistics", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_READY(response); + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::InternalServerError().status, response); + + terminate(slave); + wait(slave); +} + + +// This is an end-to-end test that verifies that the slave returns the +// correct ResourceUsage based on the currently running executors, and +// the values returned by the /monitor/statistics endpoint are as expected. +TEST_F(SlaveTest, StatisticsEndpointRunningExecutor) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_FALSE(offers.get().empty()); + + const Offer& offer = offers.get()[0]; + + // Launch a task and wait until it is in RUNNING status. + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:1;mem:32").get(), + "sleep 1000"); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // Hit the statistics endpoint and expect the response contains the + // resource statistics for the running container. + Future<Response> response = process::http::get( + slave.get()->pid, + "monitor/statistics", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + // Verify that the statistics in the response contains the proper + // resource limits for the container. + Try<JSON::Value> value = JSON::parse(response.get().body); + ASSERT_SOME(value); + + Try<JSON::Value> expected = JSON::parse(strings::format( + "[{" + "\"statistics\":{" + "\"cpus_limit\":%g," + "\"mem_limit_bytes\":%lu" + "}" + "}]", + 1 + slave::DEFAULT_EXECUTOR_CPUS, + (Megabytes(32) + slave::DEFAULT_EXECUTOR_MEM).bytes()).get()); + + ASSERT_SOME(expected); + EXPECT_TRUE(value.get().contains(expected.get())); + + driver.stop(); + driver.join(); +} + + // This test ensures that when a slave is shutting down, it will not // try to re-register with the master. TEST_F(SlaveTest, DISABLED_TerminatingSlaveDoesNotReregister)