Added metrics for slave shutdowns.

Review: https://reviews.apache.org/r/30584


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3048e5e1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3048e5e1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3048e5e1

Branch: refs/heads/master
Commit: 3048e5e1686a5ae0a0f04fd30fdda0a380e9d13d
Parents: 886efef
Author: Vinod Kone <[email protected]>
Authored: Tue Feb 3 14:34:28 2015 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Feb 4 16:27:49 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp  | 125 +++++++++++++++++++++++---------------------
 src/master/master.hpp  |   4 +-
 src/master/metrics.cpp |  12 ++++-
 src/master/metrics.hpp |   4 ++
 4 files changed, 84 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e42b922..234bbec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -118,13 +118,15 @@ public:
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
                 const PID<Master>& _master,
-                const Option<shared_ptr<RateLimiter>>& _limiter)
+                const Option<shared_ptr<RateLimiter>>& _limiter,
+                const shared_ptr<Metrics> _metrics)
     : ProcessBase(process::ID::generate("slave-observer")),
       slave(_slave),
       slaveInfo(_slaveInfo),
       slaveId(_slaveId),
       master(_master),
       limiter(_limiter),
+      metrics(_metrics),
       timeouts(0),
       pinged(false),
       connected(true)
@@ -214,6 +216,8 @@ protected:
       acquire = limiter.get()->acquire();
     }
 
+    ++metrics->slave_shutdowns_scheduled;
+
     shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
   }
 
@@ -236,6 +240,8 @@ protected:
     } else if (future.isDiscarded()) {
       LOG(INFO) << "Canceling shutdown of slave " << slaveId
                 << " since a pong is received!";
+
+      ++metrics->slave_shutdowns_canceled;
     }
 
     shuttingDown = None();
@@ -247,6 +253,7 @@ private:
   const SlaveID slaveId;
   const PID<Master> master;
   const Option<shared_ptr<RateLimiter>> limiter;
+  shared_ptr<Metrics> metrics;
   Option<Future<Nothing>> shuttingDown;
   uint32_t timeouts;
   bool pinged;
@@ -273,7 +280,7 @@ Master::Master(
     contender(_contender),
     detector(_detector),
     authorizer(_authorizer),
-    metrics(*this),
+    metrics(new Metrics(*this)),
     electedTime(None())
 {
   // NOTE: We populate 'info_' here instead of inside 'initialize()'
@@ -977,9 +984,9 @@ void Master::visit(const MessageEvent& event)
   // 'Master::Frameworks::principals' for details.
   if (principal.isSome()) {
     // If the framework has a principal, the counter must exist.
-    CHECK(metrics.frameworks.contains(principal.get()));
+    CHECK(metrics->frameworks.contains(principal.get()));
     Counter messages_received =
-      metrics.frameworks.get(principal.get()).get()->messages_received;
+      metrics->frameworks.get(principal.get()).get()->messages_received;
     ++messages_received;
   }
 
@@ -987,7 +994,7 @@ void Master::visit(const MessageEvent& event)
   if (!elected()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
             << "not elected yet";
-    ++metrics.dropped_messages;
+    ++metrics->dropped_messages;
     return;
   }
 
@@ -1001,7 +1008,7 @@ void Master::visit(const MessageEvent& event)
   if (!recovered.get().isReady()) {
     VLOG(1) << "Dropping '" << event.message->name << "' message since "
             << "not recovered yet";
-    ++metrics.dropped_messages;
+    ++metrics->dropped_messages;
     return;
   }
 
@@ -1121,9 +1128,9 @@ void Master::_visit(const MessageEvent& event)
   // Note that it could be removed in handling
   // 'UnregisterFrameworkMessage' if it's the last framework with
   // this principal.
-  if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
+  if (principal.isSome() && metrics->frameworks.contains(principal.get())) {
     Counter messages_processed =
-      metrics.frameworks.get(principal.get()).get()->messages_processed;
+      metrics->frameworks.get(principal.get()).get()->messages_processed;
     ++messages_processed;
   }
 }
@@ -1249,7 +1256,7 @@ void Master::recoveredSlavesTimeout(const Registry& 
registry)
                  << " (" << slave.info().hostname() << ") did not re-register "
                  << "within the timeout; removing it from the registrar";
 
-    ++metrics.recovery_slave_removals;
+    ++metrics->recovery_slave_removals;
 
     slaves.recovered.erase(slave.info().id());
 
@@ -1542,7 +1549,7 @@ void Master::registerFramework(
     const UPID& from,
     const FrameworkInfo& frameworkInfo)
 {
-  ++metrics.messages_register_framework;
+  ++metrics->messages_register_framework;
 
   if (authenticating.contains(from)) {
     // TODO(vinod): Consider dropping this request and fix the tests
@@ -1652,7 +1659,7 @@ void Master::reregisterFramework(
     const FrameworkInfo& frameworkInfo,
     bool failover)
 {
-  ++metrics.messages_reregister_framework;
+  ++metrics->messages_reregister_framework;
 
   if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
     LOG(ERROR) << "Framework '" << frameworkInfo.name() << "' at " << from
@@ -1881,7 +1888,7 @@ void Master::unregisterFramework(
     const UPID& from,
     const FrameworkID& frameworkId)
 {
-  ++metrics.messages_unregister_framework;
+  ++metrics->messages_unregister_framework;
 
   LOG(INFO) << "Asked to unregister framework " << frameworkId;
 
@@ -1902,7 +1909,7 @@ void Master::deactivateFramework(
     const UPID& from,
     const FrameworkID& frameworkId)
 {
-  ++metrics.messages_deactivate_framework;
+  ++metrics->messages_deactivate_framework;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2005,7 +2012,7 @@ void Master::resourceRequest(
     const FrameworkID& frameworkId,
     const vector<Request>& requests)
 {
-  ++metrics.messages_resource_request;
+  ++metrics->messages_resource_request;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2036,9 +2043,9 @@ void Master::launchTasks(
     const vector<OfferID>& offerIds)
 {
   if (!tasks.empty()) {
-    ++metrics.messages_launch_tasks;
+    ++metrics->messages_launch_tasks;
   } else {
-    ++metrics.messages_decline_offers;
+    ++metrics->messages_decline_offers;
   }
 
   Framework* framework = getFramework(frameworkId);
@@ -2235,7 +2242,7 @@ void Master::accept(
             "Task launched with invalid offers: " + error.get().message,
             TaskStatus::REASON_INVALID_OFFERS);
 
-        metrics.tasks_lost++;
+        metrics->tasks_lost++;
         stats.tasks[TASK_LOST]++;
 
         forward(update, UPID(), framework);
@@ -2339,7 +2346,7 @@ void Master::_accept(
                 TaskStatus::REASON_SLAVE_REMOVED :
                 TaskStatus::REASON_SLAVE_DISCONNECTED);
 
-        metrics.tasks_lost++;
+        metrics->tasks_lost++;
         stats.tasks[TASK_LOST]++;
 
         forward(update, UPID(), framework);
@@ -2472,7 +2479,7 @@ void Master::_accept(
                     "Not authorized to launch as user '" + user + "'",
                 TaskStatus::REASON_TASK_UNAUTHORIZED);
 
-            metrics.tasks_error++;
+            metrics->tasks_error++;
             stats.tasks[TASK_ERROR]++;
 
             forward(update, UPID(), framework);
@@ -2497,7 +2504,7 @@ void Master::_accept(
                 validationError.get().message,
                 TaskStatus::REASON_TASK_INVALID);
 
-            metrics.tasks_error++;
+            metrics->tasks_error++;
             stats.tasks[TASK_ERROR]++;
 
             forward(update, UPID(), framework);
@@ -2554,7 +2561,7 @@ void Master::_accept(
 
 void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
 {
-  ++metrics.messages_revive_offers;
+  ++metrics->messages_revive_offers;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2582,7 +2589,7 @@ void Master::killTask(
     const FrameworkID& frameworkId,
     const TaskID& taskId)
 {
-  ++metrics.messages_kill_task;
+  ++metrics->messages_kill_task;
 
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
@@ -2668,7 +2675,7 @@ void Master::statusUpdateAcknowledgement(
     const TaskID& taskId,
     const string& uuid)
 {
-  metrics.messages_status_update_acknowledgement++;
+  metrics->messages_status_update_acknowledgement++;
 
   // TODO(bmahler): Consider adding a message validator abstraction
   // for the master that takes care of all this boilerplate. Ideally
@@ -2684,7 +2691,7 @@ void Master::statusUpdateAcknowledgement(
       << "Ignoring status update acknowledgement message for task " << taskId
       << " of framework " << frameworkId << " on slave " << slaveId
       << " because the framework cannot be found";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2693,7 +2700,7 @@ void Master::statusUpdateAcknowledgement(
       << "Ignoring status update acknowledgement message for task " << taskId
       << " of framework " << *framework << " on slave " << slaveId
       << " because it is not expected from " << from;
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2704,7 +2711,7 @@ void Master::statusUpdateAcknowledgement(
       << "Cannot send status update acknowledgement message for task " << 
taskId
       << " of framework " << *framework << " to slave " << slaveId
       << " because slave is not registered";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2713,7 +2720,7 @@ void Master::statusUpdateAcknowledgement(
       << "Cannot send status update acknowledgement message for task " << 
taskId
       << " of framework " << *framework << " to slave " << *slave
       << " because slave is disconnected";
-    metrics.invalid_status_update_acknowledgements++;
+    metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
@@ -2737,7 +2744,7 @@ void Master::statusUpdateAcknowledgement(
         << "Ignoring status update acknowledgement message for task " << taskId
         << " of framework " << *framework << " to slave " << *slave
         << " because it no update was sent by this master";
-      metrics.invalid_status_update_acknowledgements++;
+      metrics->invalid_status_update_acknowledgements++;
       return;
     }
 
@@ -2760,7 +2767,7 @@ void Master::statusUpdateAcknowledgement(
 
   send(slave->pid, message);
 
-  metrics.valid_status_update_acknowledgements++;
+  metrics->valid_status_update_acknowledgements++;
 }
 
 
@@ -2771,7 +2778,7 @@ void Master::schedulerMessage(
     const ExecutorID& executorId,
     const string& data)
 {
-  ++metrics.messages_framework_to_executor;
+  ++metrics->messages_framework_to_executor;
 
   Framework* framework = getFramework(frameworkId);
 
@@ -2781,7 +2788,7 @@ void Master::schedulerMessage(
       << " of framework " << frameworkId
       << " because the framework cannot be found";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2791,7 +2798,7 @@ void Master::schedulerMessage(
       << " of framework " << *framework
       << " because it is not expected from " << from;
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2801,7 +2808,7 @@ void Master::schedulerMessage(
                  << *framework << " to slave " << slaveId
                  << " because slave is not registered";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2810,7 +2817,7 @@ void Master::schedulerMessage(
                  << *framework << " to slave " << *slave
                  << " because slave is disconnected";
     stats.invalidFrameworkMessages++;
-    metrics.invalid_framework_to_executor_messages++;
+    metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
@@ -2825,7 +2832,7 @@ void Master::schedulerMessage(
   send(slave->pid, message);
 
   stats.validFrameworkMessages++;
-  metrics.valid_framework_to_executor_messages++;
+  metrics->valid_framework_to_executor_messages++;
 }
 
 
@@ -2835,7 +2842,7 @@ void Master::registerSlave(
     const vector<Resource>& checkpointedResources,
     const string& version)
 {
-  ++metrics.messages_register_slave;
+  ++metrics->messages_register_slave;
 
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up registration request from " << from
@@ -2953,7 +2960,7 @@ void Master::_registerSlave(
         Clock::now(),
         checkpointedResources);
 
-    ++metrics.slave_registrations;
+    ++metrics->slave_registrations;
 
     addSlave(slave);
 
@@ -2976,7 +2983,7 @@ void Master::reregisterSlave(
     const vector<Archive::Framework>& completedFrameworks,
     const string& version)
 {
-  ++metrics.messages_reregister_slave;
+  ++metrics->messages_reregister_slave;
 
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up re-registration request from " << from
@@ -3149,7 +3156,7 @@ void Master::_reregisterSlave(
 
     slave->reregisteredTime = Clock::now();
 
-    ++metrics.slave_reregistrations;
+    ++metrics->slave_reregistrations;
 
     addSlave(slave, completedFrameworks);
 
@@ -3198,7 +3205,7 @@ void Master::__reregisterSlave(Slave* slave, const 
vector<Task>& tasks)
 
 void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 {
-  ++metrics.messages_unregister_slave;
+  ++metrics->messages_unregister_slave;
 
   LOG(INFO) << "Asked to unregister slave " << slaveId;
 
@@ -3219,7 +3226,7 @@ void Master::unregisterSlave(const UPID& from, const 
SlaveID& slaveId)
 // because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
-  ++metrics.messages_status_update;
+  ++metrics->messages_status_update;
 
   if (slaves.removed.get(update.slave_id()).isSome()) {
     // If the slave is removed, we have already informed
@@ -3235,7 +3242,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
     send(pid, message);
 
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3246,7 +3253,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
                  << " from unknown slave " << pid
                  << " with id " << update.slave_id();
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3257,7 +3264,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
                  << " from slave " << *slave
                  << " because the framework is unknown";
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3272,7 +3279,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
     LOG(WARNING) << "Could not lookup task for status update " << update
                  << " from slave " << *slave;
     stats.invalidStatusUpdates++;
-    metrics.invalid_status_updates++;
+    metrics->invalid_status_updates++;
     return;
   }
 
@@ -3285,7 +3292,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
   }
 
   stats.validStatusUpdates++;
-  metrics.valid_status_updates++;
+  metrics->valid_status_updates++;
 }
 
 
@@ -3319,7 +3326,7 @@ void Master::exitedExecutor(
     const ExecutorID& executorId,
     int32_t status)
 {
-  ++metrics.messages_exited_executor;
+  ++metrics->messages_exited_executor;
 
   if (slaves.removed.get(slaveId).isSome()) {
     // If the slave is removed, we have already informed
@@ -3394,7 +3401,7 @@ void Master::reconcileTasks(
     const FrameworkID& frameworkId,
     const std::vector<TaskStatus>& statuses)
 {
-  ++metrics.messages_reconcile_tasks;
+  ++metrics->messages_reconcile_tasks;
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -3735,7 +3742,7 @@ void Master::offer(const FrameworkID& frameworkId,
 // 'authenticate' message doesn't contain the 'FrameworkID'.
 void Master::authenticate(const UPID& from, const UPID& pid)
 {
-  ++metrics.messages_authenticate;
+  ++metrics->messages_authenticate;
 
   // An authentication request is sent by a client (slave/framework)
   // in the following cases:
@@ -4073,8 +4080,8 @@ void Master::addFramework(Framework* framework)
   if (principal.isSome()) {
     // Create new framework metrics if this framework is the first
     // one of this principal. Otherwise existing metrics are reused.
-    if (!metrics.frameworks.contains(principal.get())) {
-      metrics.frameworks.put(
+    if (!metrics->frameworks.contains(principal.get())) {
+      metrics->frameworks.put(
           principal.get(),
           Owned<Metrics::Frameworks>(new 
Metrics::Frameworks(principal.get())));
     }
@@ -4252,8 +4259,8 @@ void Master::removeFramework(Framework* framework)
     // Remove the metrics for the principal if this framework is the
     // last one with this principal.
     if (!frameworks.principals.containsValue(principal.get())) {
-      CHECK(metrics.frameworks.contains(principal.get()));
-      metrics.frameworks.erase(principal.get());
+      CHECK(metrics->frameworks.contains(principal.get()));
+      metrics->frameworks.erase(principal.get());
     }
   }
 
@@ -4321,7 +4328,7 @@ void Master::addSlave(
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
-      slave->pid, slave->info, slave->id, self(), slaves.limiter);
+      slave->pid, slave->info, slave->id, self(), slaves.limiter, metrics);
 
   spawn(slave->observer);
 
@@ -4496,7 +4503,7 @@ void Master::_removeSlave(
 
   LOG(INFO) << "Removed slave " << slaveInfo.id() << " ("
             << slaveInfo.hostname() << ")";
-  ++metrics.slave_removals;
+  ++metrics->slave_removals;
 
   // Forward the LOST updates on to the framework.
   foreach (const StatusUpdate& update, updates) {
@@ -4611,10 +4618,10 @@ void Master::updateTask(Task* task, const StatusUpdate& 
update)
     }
 
     switch (task->state()) {
-      case TASK_FINISHED: ++metrics.tasks_finished; break;
-      case TASK_FAILED:   ++metrics.tasks_failed;   break;
-      case TASK_KILLED:   ++metrics.tasks_killed;   break;
-      case TASK_LOST:     ++metrics.tasks_lost;     break;
+      case TASK_FINISHED: ++metrics->tasks_finished; break;
+      case TASK_FAILED:   ++metrics->tasks_failed;   break;
+      case TASK_KILLED:   ++metrics->tasks_killed;   break;
+      case TASK_LOST:     ++metrics->tasks_lost;     break;
       default: break;
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5a5c86f..dcfd38a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -666,7 +666,9 @@ private:
     uint64_t invalidFrameworkMessages;
   } stats;
 
-  Metrics metrics;
+  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+  // thread safe.
+  memory::shared_ptr<Metrics> metrics;
 
   // Gauge handlers.
   double _uptime_secs()

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 956fe50..a5cde16 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -148,7 +148,11 @@ Metrics::Metrics(const Master& master)
     slave_reregistrations(
         "master/slave_reregistrations"),
     slave_removals(
-        "master/slave_removals")
+        "master/slave_removals"),
+    slave_shutdowns_scheduled(
+         "master/slave_shutdowns_scheduled"),
+    slave_shutdowns_canceled(
+         "master/slave_shutdowns_canceled")
 {
   // TODO(dhamon): Check return values of 'add'.
   process::metrics::add(uptime_secs);
@@ -220,6 +224,9 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(slave_reregistrations);
   process::metrics::add(slave_removals);
 
+  process::metrics::add(slave_shutdowns_scheduled);
+  process::metrics::add(slave_shutdowns_canceled);
+
   // Create resource gauges.
   // TODO(dhamon): Set these up dynamically when adding a slave based on the
   // resources the slave exposes.
@@ -319,6 +326,9 @@ Metrics::~Metrics()
   process::metrics::remove(slave_reregistrations);
   process::metrics::remove(slave_removals);
 
+  process::metrics::remove(slave_shutdowns_scheduled);
+  process::metrics::remove(slave_shutdowns_canceled);
+
   foreach (const process::metrics::Gauge& gauge, resources_total) {
     process::metrics::remove(gauge);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3048e5e1/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 6a43abc..5e18f88 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -156,6 +156,10 @@ struct Metrics
   process::metrics::Counter slave_reregistrations;
   process::metrics::Counter slave_removals;
 
+  // Slave observer metrics.
+  process::metrics::Counter slave_shutdowns_scheduled;
+  process::metrics::Counter slave_shutdowns_canceled;
+
   // Resource metrics.
   std::vector<process::metrics::Gauge> resources_total;
   std::vector<process::metrics::Gauge> resources_used;

Reply via email to