Added metrics for slave shutdowns. Review: https://reviews.apache.org/r/30584
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3048e5e1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3048e5e1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3048e5e1 Branch: refs/heads/master Commit: 3048e5e1686a5ae0a0f04fd30fdda0a380e9d13d Parents: 886efef Author: Vinod Kone <[email protected]> Authored: Tue Feb 3 14:34:28 2015 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Feb 4 16:27:49 2015 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 125 +++++++++++++++++++++++--------------------- src/master/master.hpp | 4 +- src/master/metrics.cpp | 12 ++++- src/master/metrics.hpp | 4 ++ 4 files changed, 84 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e42b922..234bbec 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -118,13 +118,15 @@ public: const SlaveInfo& _slaveInfo, const SlaveID& _slaveId, const PID<Master>& _master, - const Option<shared_ptr<RateLimiter>>& _limiter) + const Option<shared_ptr<RateLimiter>>& _limiter, + const shared_ptr<Metrics> _metrics) : ProcessBase(process::ID::generate("slave-observer")), slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId), master(_master), limiter(_limiter), + metrics(_metrics), timeouts(0), pinged(false), connected(true) @@ -214,6 +216,8 @@ protected: acquire = limiter.get()->acquire(); } + ++metrics->slave_shutdowns_scheduled; + shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown)); } @@ -236,6 +240,8 @@ protected: } else if (future.isDiscarded()) { LOG(INFO) << "Canceling shutdown of slave " << slaveId << " since a pong is received!"; + + ++metrics->slave_shutdowns_canceled; } shuttingDown = None(); @@ -247,6 +253,7 @@ private: const SlaveID slaveId; const PID<Master> master; const Option<shared_ptr<RateLimiter>> limiter; + shared_ptr<Metrics> metrics; Option<Future<Nothing>> shuttingDown; uint32_t timeouts; bool pinged; @@ -273,7 +280,7 @@ Master::Master( contender(_contender), detector(_detector), authorizer(_authorizer), - metrics(*this), + metrics(new Metrics(*this)), electedTime(None()) { // NOTE: We populate 'info_' here instead of inside 'initialize()' @@ -977,9 +984,9 @@ void Master::visit(const MessageEvent& event) // 'Master::Frameworks::principals' for details. if (principal.isSome()) { // If the framework has a principal, the counter must exist. - CHECK(metrics.frameworks.contains(principal.get())); + CHECK(metrics->frameworks.contains(principal.get())); Counter messages_received = - metrics.frameworks.get(principal.get()).get()->messages_received; + metrics->frameworks.get(principal.get()).get()->messages_received; ++messages_received; } @@ -987,7 +994,7 @@ void Master::visit(const MessageEvent& event) if (!elected()) { VLOG(1) << "Dropping '" << event.message->name << "' message since " << "not elected yet"; - ++metrics.dropped_messages; + ++metrics->dropped_messages; return; } @@ -1001,7 +1008,7 @@ void Master::visit(const MessageEvent& event) if (!recovered.get().isReady()) { VLOG(1) << "Dropping '" << event.message->name << "' message since " << "not recovered yet"; - ++metrics.dropped_messages; + ++metrics->dropped_messages; return; } @@ -1121,9 +1128,9 @@ void Master::_visit(const MessageEvent& event) // Note that it could be removed in handling // 'UnregisterFrameworkMessage' if it's the last framework with // this principal. - if (principal.isSome() && metrics.frameworks.contains(principal.get())) { + if (principal.isSome() && metrics->frameworks.contains(principal.get())) { Counter messages_processed = - metrics.frameworks.get(principal.get()).get()->messages_processed; + metrics->frameworks.get(principal.get()).get()->messages_processed; ++messages_processed; } } @@ -1249,7 +1256,7 @@ void Master::recoveredSlavesTimeout(const Registry& registry) << " (" << slave.info().hostname() << ") did not re-register " << "within the timeout; removing it from the registrar"; - ++metrics.recovery_slave_removals; + ++metrics->recovery_slave_removals; slaves.recovered.erase(slave.info().id()); @@ -1542,7 +1549,7 @@ void Master::registerFramework( const UPID& from, const FrameworkInfo& frameworkInfo) { - ++metrics.messages_register_framework; + ++metrics->messages_register_framework; if (authenticating.contains(from)) { // TODO(vinod): Consider dropping this request and fix the tests @@ -1652,7 +1659,7 @@ void Master::reregisterFramework( const FrameworkInfo& frameworkInfo, bool failover) { - ++metrics.messages_reregister_framework; + ++metrics->messages_reregister_framework; if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { LOG(ERROR) << "Framework '" << frameworkInfo.name() << "' at " << from @@ -1881,7 +1888,7 @@ void Master::unregisterFramework( const UPID& from, const FrameworkID& frameworkId) { - ++metrics.messages_unregister_framework; + ++metrics->messages_unregister_framework; LOG(INFO) << "Asked to unregister framework " << frameworkId; @@ -1902,7 +1909,7 @@ void Master::deactivateFramework( const UPID& from, const FrameworkID& frameworkId) { - ++metrics.messages_deactivate_framework; + ++metrics->messages_deactivate_framework; Framework* framework = getFramework(frameworkId); @@ -2005,7 +2012,7 @@ void Master::resourceRequest( const FrameworkID& frameworkId, const vector<Request>& requests) { - ++metrics.messages_resource_request; + ++metrics->messages_resource_request; Framework* framework = getFramework(frameworkId); @@ -2036,9 +2043,9 @@ void Master::launchTasks( const vector<OfferID>& offerIds) { if (!tasks.empty()) { - ++metrics.messages_launch_tasks; + ++metrics->messages_launch_tasks; } else { - ++metrics.messages_decline_offers; + ++metrics->messages_decline_offers; } Framework* framework = getFramework(frameworkId); @@ -2235,7 +2242,7 @@ void Master::accept( "Task launched with invalid offers: " + error.get().message, TaskStatus::REASON_INVALID_OFFERS); - metrics.tasks_lost++; + metrics->tasks_lost++; stats.tasks[TASK_LOST]++; forward(update, UPID(), framework); @@ -2339,7 +2346,7 @@ void Master::_accept( TaskStatus::REASON_SLAVE_REMOVED : TaskStatus::REASON_SLAVE_DISCONNECTED); - metrics.tasks_lost++; + metrics->tasks_lost++; stats.tasks[TASK_LOST]++; forward(update, UPID(), framework); @@ -2472,7 +2479,7 @@ void Master::_accept( "Not authorized to launch as user '" + user + "'", TaskStatus::REASON_TASK_UNAUTHORIZED); - metrics.tasks_error++; + metrics->tasks_error++; stats.tasks[TASK_ERROR]++; forward(update, UPID(), framework); @@ -2497,7 +2504,7 @@ void Master::_accept( validationError.get().message, TaskStatus::REASON_TASK_INVALID); - metrics.tasks_error++; + metrics->tasks_error++; stats.tasks[TASK_ERROR]++; forward(update, UPID(), framework); @@ -2554,7 +2561,7 @@ void Master::_accept( void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) { - ++metrics.messages_revive_offers; + ++metrics->messages_revive_offers; Framework* framework = getFramework(frameworkId); @@ -2582,7 +2589,7 @@ void Master::killTask( const FrameworkID& frameworkId, const TaskID& taskId) { - ++metrics.messages_kill_task; + ++metrics->messages_kill_task; LOG(INFO) << "Asked to kill task " << taskId << " of framework " << frameworkId; @@ -2668,7 +2675,7 @@ void Master::statusUpdateAcknowledgement( const TaskID& taskId, const string& uuid) { - metrics.messages_status_update_acknowledgement++; + metrics->messages_status_update_acknowledgement++; // TODO(bmahler): Consider adding a message validator abstraction // for the master that takes care of all this boilerplate. Ideally @@ -2684,7 +2691,7 @@ void Master::statusUpdateAcknowledgement( << "Ignoring status update acknowledgement message for task " << taskId << " of framework " << frameworkId << " on slave " << slaveId << " because the framework cannot be found"; - metrics.invalid_status_update_acknowledgements++; + metrics->invalid_status_update_acknowledgements++; return; } @@ -2693,7 +2700,7 @@ void Master::statusUpdateAcknowledgement( << "Ignoring status update acknowledgement message for task " << taskId << " of framework " << *framework << " on slave " << slaveId << " because it is not expected from " << from; - metrics.invalid_status_update_acknowledgements++; + metrics->invalid_status_update_acknowledgements++; return; } @@ -2704,7 +2711,7 @@ void Master::statusUpdateAcknowledgement( << "Cannot send status update acknowledgement message for task " << taskId << " of framework " << *framework << " to slave " << slaveId << " because slave is not registered"; - metrics.invalid_status_update_acknowledgements++; + metrics->invalid_status_update_acknowledgements++; return; } @@ -2713,7 +2720,7 @@ void Master::statusUpdateAcknowledgement( << "Cannot send status update acknowledgement message for task " << taskId << " of framework " << *framework << " to slave " << *slave << " because slave is disconnected"; - metrics.invalid_status_update_acknowledgements++; + metrics->invalid_status_update_acknowledgements++; return; } @@ -2737,7 +2744,7 @@ void Master::statusUpdateAcknowledgement( << "Ignoring status update acknowledgement message for task " << taskId << " of framework " << *framework << " to slave " << *slave << " because it no update was sent by this master"; - metrics.invalid_status_update_acknowledgements++; + metrics->invalid_status_update_acknowledgements++; return; } @@ -2760,7 +2767,7 @@ void Master::statusUpdateAcknowledgement( send(slave->pid, message); - metrics.valid_status_update_acknowledgements++; + metrics->valid_status_update_acknowledgements++; } @@ -2771,7 +2778,7 @@ void Master::schedulerMessage( const ExecutorID& executorId, const string& data) { - ++metrics.messages_framework_to_executor; + ++metrics->messages_framework_to_executor; Framework* framework = getFramework(frameworkId); @@ -2781,7 +2788,7 @@ void Master::schedulerMessage( << " of framework " << frameworkId << " because the framework cannot be found"; stats.invalidFrameworkMessages++; - metrics.invalid_framework_to_executor_messages++; + metrics->invalid_framework_to_executor_messages++; return; } @@ -2791,7 +2798,7 @@ void Master::schedulerMessage( << " of framework " << *framework << " because it is not expected from " << from; stats.invalidFrameworkMessages++; - metrics.invalid_framework_to_executor_messages++; + metrics->invalid_framework_to_executor_messages++; return; } @@ -2801,7 +2808,7 @@ void Master::schedulerMessage( << *framework << " to slave " << slaveId << " because slave is not registered"; stats.invalidFrameworkMessages++; - metrics.invalid_framework_to_executor_messages++; + metrics->invalid_framework_to_executor_messages++; return; } @@ -2810,7 +2817,7 @@ void Master::schedulerMessage( << *framework << " to slave " << *slave << " because slave is disconnected"; stats.invalidFrameworkMessages++; - metrics.invalid_framework_to_executor_messages++; + metrics->invalid_framework_to_executor_messages++; return; } @@ -2825,7 +2832,7 @@ void Master::schedulerMessage( send(slave->pid, message); stats.validFrameworkMessages++; - metrics.valid_framework_to_executor_messages++; + metrics->valid_framework_to_executor_messages++; } @@ -2835,7 +2842,7 @@ void Master::registerSlave( const vector<Resource>& checkpointedResources, const string& version) { - ++metrics.messages_register_slave; + ++metrics->messages_register_slave; if (authenticating.contains(from)) { LOG(INFO) << "Queuing up registration request from " << from @@ -2953,7 +2960,7 @@ void Master::_registerSlave( Clock::now(), checkpointedResources); - ++metrics.slave_registrations; + ++metrics->slave_registrations; addSlave(slave); @@ -2976,7 +2983,7 @@ void Master::reregisterSlave( const vector<Archive::Framework>& completedFrameworks, const string& version) { - ++metrics.messages_reregister_slave; + ++metrics->messages_reregister_slave; if (authenticating.contains(from)) { LOG(INFO) << "Queuing up re-registration request from " << from @@ -3149,7 +3156,7 @@ void Master::_reregisterSlave( slave->reregisteredTime = Clock::now(); - ++metrics.slave_reregistrations; + ++metrics->slave_reregistrations; addSlave(slave, completedFrameworks); @@ -3198,7 +3205,7 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) { - ++metrics.messages_unregister_slave; + ++metrics->messages_unregister_slave; LOG(INFO) << "Asked to unregister slave " << slaveId; @@ -3219,7 +3226,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) // because the status updates will be sent by the slave. void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) { - ++metrics.messages_status_update; + ++metrics->messages_status_update; if (slaves.removed.get(update.slave_id()).isSome()) { // If the slave is removed, we have already informed @@ -3235,7 +3242,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) send(pid, message); stats.invalidStatusUpdates++; - metrics.invalid_status_updates++; + metrics->invalid_status_updates++; return; } @@ -3246,7 +3253,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) << " from unknown slave " << pid << " with id " << update.slave_id(); stats.invalidStatusUpdates++; - metrics.invalid_status_updates++; + metrics->invalid_status_updates++; return; } @@ -3257,7 +3264,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) << " from slave " << *slave << " because the framework is unknown"; stats.invalidStatusUpdates++; - metrics.invalid_status_updates++; + metrics->invalid_status_updates++; return; } @@ -3272,7 +3279,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) LOG(WARNING) << "Could not lookup task for status update " << update << " from slave " << *slave; stats.invalidStatusUpdates++; - metrics.invalid_status_updates++; + metrics->invalid_status_updates++; return; } @@ -3285,7 +3292,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) } stats.validStatusUpdates++; - metrics.valid_status_updates++; + metrics->valid_status_updates++; } @@ -3319,7 +3326,7 @@ void Master::exitedExecutor( const ExecutorID& executorId, int32_t status) { - ++metrics.messages_exited_executor; + ++metrics->messages_exited_executor; if (slaves.removed.get(slaveId).isSome()) { // If the slave is removed, we have already informed @@ -3394,7 +3401,7 @@ void Master::reconcileTasks( const FrameworkID& frameworkId, const std::vector<TaskStatus>& statuses) { - ++metrics.messages_reconcile_tasks; + ++metrics->messages_reconcile_tasks; Framework* framework = getFramework(frameworkId); if (framework == NULL) { @@ -3735,7 +3742,7 @@ void Master::offer(const FrameworkID& frameworkId, // 'authenticate' message doesn't contain the 'FrameworkID'. void Master::authenticate(const UPID& from, const UPID& pid) { - ++metrics.messages_authenticate; + ++metrics->messages_authenticate; // An authentication request is sent by a client (slave/framework) // in the following cases: @@ -4073,8 +4080,8 @@ void Master::addFramework(Framework* framework) if (principal.isSome()) { // Create new framework metrics if this framework is the first // one of this principal. Otherwise existing metrics are reused. - if (!metrics.frameworks.contains(principal.get())) { - metrics.frameworks.put( + if (!metrics->frameworks.contains(principal.get())) { + metrics->frameworks.put( principal.get(), Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get()))); } @@ -4252,8 +4259,8 @@ void Master::removeFramework(Framework* framework) // Remove the metrics for the principal if this framework is the // last one with this principal. if (!frameworks.principals.containsValue(principal.get())) { - CHECK(metrics.frameworks.contains(principal.get())); - metrics.frameworks.erase(principal.get()); + CHECK(metrics->frameworks.contains(principal.get())); + metrics->frameworks.erase(principal.get()); } } @@ -4321,7 +4328,7 @@ void Master::addSlave( // Set up an observer for the slave. slave->observer = new SlaveObserver( - slave->pid, slave->info, slave->id, self(), slaves.limiter); + slave->pid, slave->info, slave->id, self(), slaves.limiter, metrics); spawn(slave->observer); @@ -4496,7 +4503,7 @@ void Master::_removeSlave( LOG(INFO) << "Removed slave " << slaveInfo.id() << " (" << slaveInfo.hostname() << ")"; - ++metrics.slave_removals; + ++metrics->slave_removals; // Forward the LOST updates on to the framework. foreach (const StatusUpdate& update, updates) { @@ -4611,10 +4618,10 @@ void Master::updateTask(Task* task, const StatusUpdate& update) } switch (task->state()) { - case TASK_FINISHED: ++metrics.tasks_finished; break; - case TASK_FAILED: ++metrics.tasks_failed; break; - case TASK_KILLED: ++metrics.tasks_killed; break; - case TASK_LOST: ++metrics.tasks_lost; break; + case TASK_FINISHED: ++metrics->tasks_finished; break; + case TASK_FAILED: ++metrics->tasks_failed; break; + case TASK_KILLED: ++metrics->tasks_killed; break; + case TASK_LOST: ++metrics->tasks_lost; break; default: break; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 5a5c86f..dcfd38a 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -666,7 +666,9 @@ private: uint64_t invalidFrameworkMessages; } stats; - Metrics metrics; + // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is + // thread safe. + memory::shared_ptr<Metrics> metrics; // Gauge handlers. double _uptime_secs() http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.cpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp index 956fe50..a5cde16 100644 --- a/src/master/metrics.cpp +++ b/src/master/metrics.cpp @@ -148,7 +148,11 @@ Metrics::Metrics(const Master& master) slave_reregistrations( "master/slave_reregistrations"), slave_removals( - "master/slave_removals") + "master/slave_removals"), + slave_shutdowns_scheduled( + "master/slave_shutdowns_scheduled"), + slave_shutdowns_canceled( + "master/slave_shutdowns_canceled") { // TODO(dhamon): Check return values of 'add'. process::metrics::add(uptime_secs); @@ -220,6 +224,9 @@ Metrics::Metrics(const Master& master) process::metrics::add(slave_reregistrations); process::metrics::add(slave_removals); + process::metrics::add(slave_shutdowns_scheduled); + process::metrics::add(slave_shutdowns_canceled); + // Create resource gauges. // TODO(dhamon): Set these up dynamically when adding a slave based on the // resources the slave exposes. @@ -319,6 +326,9 @@ Metrics::~Metrics() process::metrics::remove(slave_reregistrations); process::metrics::remove(slave_removals); + process::metrics::remove(slave_shutdowns_scheduled); + process::metrics::remove(slave_shutdowns_canceled); + foreach (const process::metrics::Gauge& gauge, resources_total) { process::metrics::remove(gauge); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.hpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp index 6a43abc..5e18f88 100644 --- a/src/master/metrics.hpp +++ b/src/master/metrics.hpp @@ -156,6 +156,10 @@ struct Metrics process::metrics::Counter slave_reregistrations; process::metrics::Counter slave_removals; + // Slave observer metrics. + process::metrics::Counter slave_shutdowns_scheduled; + process::metrics::Counter slave_shutdowns_canceled; + // Resource metrics. std::vector<process::metrics::Gauge> resources_total; std::vector<process::metrics::Gauge> resources_used;
