Repository: mesos Updated Branches: refs/heads/master bb0d506b5 -> c9a218921
Fixed scheduler message metrics to work with scheduler Calls. Review: https://reviews.apache.org/r/37244 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9a21892 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9a21892 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9a21892 Branch: refs/heads/master Commit: c9a21892134cb8e5aae96a15bf33070426422475 Parents: c5ea91e Author: Vinod Kone <[email protected]> Authored: Sat Aug 8 08:19:46 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Sat Aug 8 19:38:49 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 53 +++++++++++++++++++++++++---------------- src/master/metrics.hpp | 2 ++ src/tests/master_tests.cpp | 5 ++++ 3 files changed, 39 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c9a21892/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index b481c32..08dd34d 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2269,8 +2269,6 @@ void Master::unregisterFramework( const UPID& from, const FrameworkID& frameworkId) { - ++metrics->messages_unregister_framework; - LOG(INFO) << "Asked to unregister framework " << frameworkId; Framework* framework = getFramework(frameworkId); @@ -2401,8 +2399,6 @@ void Master::resourceRequest( const FrameworkID& frameworkId, const vector<Request>& requests) { - ++metrics->messages_resource_request; - Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -2436,6 +2432,8 @@ void Master::request( LOG(INFO) << "Processing REQUEST call for framework " << *framework; + ++metrics->messages_resource_request; + allocator->requestResources( framework->id(), google::protobuf::convert(request.requests())); @@ -2449,12 +2447,6 @@ void Master::launchTasks( const Filters& filters, const vector<OfferID>& offerIds) { - if (!tasks.empty()) { - ++metrics->messages_launch_tasks; - } else { - ++metrics->messages_decline_offers; - } - Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -2602,7 +2594,19 @@ void Master::accept( Framework* framework, const scheduler::Call::Accept& accept) { - // TODO(jieyu): Update metrics for ACCEPT calls. + CHECK_NOTNULL(framework); + + foreach (const Offer::Operation& operation, accept.operations()) { + if (operation.type() == Offer::Operation::LAUNCH) { + if (operation.launch().task_infos().size() > 0) { + ++metrics->messages_launch_tasks; + } else { + ++metrics->messages_decline_offers; + } + } + + // TODO(jieyu): Add metrics for non launch operations. + } // TODO(bmahler): We currently only support using multiple offers // for a single slave. @@ -3064,6 +3068,8 @@ void Master::decline( LOG(INFO) << "Processing DECLINE call for offers: " << decline.offer_ids() << " for framework " << *framework; + ++metrics->messages_decline_offers; + // Return resources to the allocator. foreach (const OfferID& offerId, decline.offer_ids()) { Offer* offer = getOffer(offerId); @@ -3085,8 +3091,6 @@ void Master::decline( void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) { - ++metrics->messages_revive_offers; - Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -3112,6 +3116,9 @@ void Master::revive(Framework* framework) CHECK_NOTNULL(framework); LOG(INFO) << "Processing REVIVE call for framework " << *framework; + + ++metrics->messages_revive_offers; + allocator->reviveOffers(framework->id()); } @@ -3124,8 +3131,6 @@ void Master::killTask( LOG(INFO) << "Asked to kill task " << taskId << " of framework " << frameworkId; - ++metrics->messages_kill_task; - Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -3153,6 +3158,8 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill) { CHECK_NOTNULL(framework); + ++metrics->messages_kill_task; + const TaskID& taskId = kill.task_id(); const Option<SlaveID> slaveId = kill.has_slave_id() ? Option<SlaveID>(kill.slave_id()) : None(); @@ -3235,8 +3242,6 @@ void Master::statusUpdateAcknowledgement( const TaskID& taskId, const string& uuid) { - metrics->messages_status_update_acknowledgement++; - // TODO(bmahler): Consider adding a message validator abstraction // for the master that takes care of all this boilerplate. Ideally // by the time we process messages in the critical master code, we @@ -3279,6 +3284,8 @@ void Master::acknowledge( { CHECK_NOTNULL(framework); + metrics->messages_status_update_acknowledgement++; + const SlaveID slaveId = acknowledge.slave_id(); const TaskID taskId = acknowledge.task_id(); const UUID uuid = UUID::fromBytes(acknowledge.uuid()); @@ -3357,8 +3364,6 @@ void Master::schedulerMessage( const ExecutorID& executorId, const string& data) { - metrics->messages_framework_to_executor++; - Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -3458,6 +3463,8 @@ void Master::message( { CHECK_NOTNULL(framework); + metrics->messages_framework_to_executor++; + Slave* slave = slaves.registered.get(message.slave_id()); if (slave == NULL) { @@ -4133,6 +4140,8 @@ void Master::shutdown( { CHECK_NOTNULL(framework); + // TODO(vinod): Add a metric for executor shutdowns. + if (!slaves.registered.contains(shutdown.slave_id())) { LOG(WARNING) << "Unable to shutdown executor '" << shutdown.executor_id() << "' of framework " << framework->id() @@ -4201,8 +4210,6 @@ void Master::reconcileTasks( const FrameworkID& frameworkId, const std::vector<TaskStatus>& statuses) { - ++metrics->messages_reconcile_tasks; - Framework* framework = getFramework(frameworkId); if (framework == NULL) { LOG(WARNING) << "Unknown framework " << frameworkId << " at " << from @@ -4227,6 +4234,8 @@ void Master::_reconcileTasks( { CHECK_NOTNULL(framework); + ++metrics->messages_reconcile_tasks; + if (statuses.empty()) { // Implicit reconciliation. LOG(INFO) << "Performing implicit task state reconciliation" @@ -5034,6 +5043,8 @@ void Master::teardown(Framework* framework) LOG(INFO) << "Processing TEARDOWN call for framework " << *framework; + ++metrics->messages_unregister_framework; + removeFramework(framework); } http://git-wip-us.apache.org/repos/asf/mesos/blob/c9a21892/src/master/metrics.hpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp index 5e96a5f..c51887e 100644 --- a/src/master/metrics.hpp +++ b/src/master/metrics.hpp @@ -131,6 +131,8 @@ struct Metrics process::metrics::Counter messages_revive_offers; process::metrics::Counter messages_reconcile_tasks; process::metrics::Counter messages_framework_to_executor; + + // Messages from executors. process::metrics::Counter messages_executor_to_framework; // Messages from slaves. http://git-wip-us.apache.org/repos/asf/mesos/blob/c9a21892/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 0c8e8be..a4703af 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -1463,6 +1463,8 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest) } +// TODO(vinod): These tests only verify that the master metrics exist +// but we need tests that verify that these metrics are updated. TEST_F(MasterTest, MetricsInMetricsEndpoint) { Try<PID<Master>> master = StartMaster(); @@ -1512,6 +1514,9 @@ TEST_F(MasterTest, MetricsInMetricsEndpoint) EXPECT_EQ(1u, snapshot.values.count("master/messages_reconcile_tasks")); EXPECT_EQ(1u, snapshot.values.count("master/messages_framework_to_executor")); + // Messages from executors. + EXPECT_EQ(1u, snapshot.values.count("master/messages_executor_to_framework")); + // Messages from slaves. EXPECT_EQ(1u, snapshot.values.count("master/messages_register_slave")); EXPECT_EQ(1u, snapshot.values.count("master/messages_reregister_slave"));
