Repository: mesos
Updated Branches:
  refs/heads/master 26091f461 -> c24268f13


Index slaves by UPID in the master.

Review: https://reviews.apache.org/r/34388


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af

Branch: refs/heads/master
Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf
Parents: b19ffd2
Author: Benjamin Mahler <[email protected]>
Authored: Mon May 18 18:37:11 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Tue May 19 11:55:30 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 127 +++++++++++++++++++++++----------------------
 src/master/master.hpp |  65 ++++++++++++++++++++++-
 2 files changed, 129 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index eaea79d..d2df99c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -973,7 +973,9 @@ void Master::exited(const UPID& pid)
     }
   }
 
-  // The semantics when a slave gets disconnected are as follows:
+  // The semantics when a registered slave gets disconnected are as
+  // follows:
+  //
   // 1) If the slave is not checkpointing, the slave is immediately
   //    removed and all tasks running on it are transitioned to LOST.
   //    No resources are recovered, because the slave is removed.
@@ -985,42 +987,42 @@ void Master::exited(const UPID& pid)
   //    2.2) Framework is not-checkpointing: The slave is not removed
   //         but the framework is removed from the slave's structs,
   //         its tasks transitioned to LOST and resources recovered.
-  foreachvalue (Slave* slave, slaves.registered) {
-    if (slave->pid == pid) {
-      LOG(INFO) << "Slave " << *slave << " disconnected";
-
-      if (!slave->info.checkpoint()) {
-        // Remove the slave, if it is not checkpointing.
-        LOG(INFO) << "Removing disconnected slave " << *slave
-                  << " because it is not checkpointing!";
-        removeSlave(slave,
-                    "slave is non-checkpointing and disconnected");
-        return;
-      } else if (slave->connected) {
-        // Checkpointing slaves can just be disconnected.
-        disconnect(slave);
+  if (slaves.registered.contains(pid)) {
+    Slave* slave = slaves.registered.get(pid);
+    CHECK_NOTNULL(slave);
+
+    LOG(INFO) << "Slave " << *slave << " disconnected";
+
+    if (!slave->info.checkpoint()) {
+      // Remove the slave, if it is not checkpointing.
+      LOG(INFO) << "Removing disconnected slave " << *slave
+                << " because it is not checkpointing!";
+      removeSlave(slave, "slave is non-checkpointing and disconnected");
+      return;
+    } else if (slave->connected) {
+      // Checkpointing slaves can just be disconnected.
+      disconnect(slave);
 
-        // Remove all non-checkpointing frameworks.
-        hashset<FrameworkID> frameworkIds =
+      // Remove all non-checkpointing frameworks.
+      hashset<FrameworkID> frameworkIds =
           slave->tasks.keys() | slave->executors.keys();
 
-        foreach (const FrameworkID& frameworkId, frameworkIds) {
-          Framework* framework = getFramework(frameworkId);
-          if (framework != NULL && !framework->info.checkpoint()) {
-            LOG(INFO) << "Removing framework " << *framework
-                      << " from disconnected slave " << *slave
-                      << " because the framework is not checkpointing";
+      foreach (const FrameworkID& frameworkId, frameworkIds) {
+        Framework* framework = getFramework(frameworkId);
+        if (framework != NULL && !framework->info.checkpoint()) {
+          LOG(INFO) << "Removing framework " << *framework
+                    << " from disconnected slave " << *slave
+                    << " because the framework is not checkpointing";
 
-            removeFramework(slave, framework);
-          }
+          removeFramework(slave, framework);
         }
-      } else {
-        // NOTE: A duplicate exited() event is possible for a slave
-        // because its PID doesn't change on restart. See MESOS-675
-        // for details.
-        LOG(WARNING) << "Ignoring duplicate exited() notification for "
-                     << "checkpointing slave " << *slave;
       }
+    } else {
+      // NOTE: A duplicate exited() event is possible for a slave
+      // because its PID doesn't change on restart. See MESOS-675
+      // for details.
+      LOG(WARNING) << "Ignoring duplicate exited() notification for "
+                   << "checkpointing slave " << *slave;
     }
   }
 }
@@ -3094,31 +3096,30 @@ void Master::registerSlave(
   }
 
   // Check if this slave is already registered (because it retries).
-  foreachvalue (Slave* slave, slaves.registered) {
-    if (slave->pid == from) {
-      if (!slave->connected) {
-        // The slave was previously disconnected but it is now trying
-        // to register as a new slave. This could happen if the slave
-        // failed recovery and hence registering as a new slave before
-        // the master removed the old slave from its map.
-        LOG(INFO)
-          << "Removing old disconnected slave " << *slave
-          << " because a registration attempt is being made from " << from;
-        removeSlave(slave,
-                    "a new slave registered at the same address",
-                    metrics->slave_removals_reason_registered);
-        break;
-      } else {
-        CHECK(slave->active)
-            << "Unexpected connected but deactivated slave " << *slave;
-
-        LOG(INFO) << "Slave " << *slave << " already registered,"
-                  << " resending acknowledgement";
-        SlaveRegisteredMessage message;
-        message.mutable_slave_id()->MergeFrom(slave->id);
-        send(from, message);
-        return;
-      }
+  if (slaves.registered.contains(from)) {
+    Slave* slave = slaves.registered.get(from);
+    CHECK_NOTNULL(slave);
+
+    if (!slave->connected) {
+      // The slave was previously disconnected but it is now trying
+      // to register as a new slave. This could happen if the slave
+      // failed recovery and hence registering as a new slave before
+      // the master removed the old slave from its map.
+      LOG(INFO) << "Removing old disconnected slave " << *slave
+                << " because a registration attempt occurred";
+      removeSlave(slave,
+                  "a new slave registered at the same address",
+                  metrics->slave_removals_reason_registered);
+    } else {
+      CHECK(slave->active)
+        << "Unexpected connected but deactivated slave " << *slave;
+
+      LOG(INFO) << "Slave " << *slave << " already registered,"
+                << " resending acknowledgement";
+      SlaveRegisteredMessage message;
+      message.mutable_slave_id()->MergeFrom(slave->id);
+      send(from, message);
+      return;
     }
   }
 
@@ -3574,7 +3575,8 @@ void Master::exitedExecutor(
     return;
   }
 
-  Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]);
+  Slave* slave = slaves.registered.get(slaveId);
+  CHECK_NOTNULL(slave);
 
   if (!slave->hasExecutor(frameworkId, executorId)) {
     LOG(WARNING) << "Ignoring unknown exited executor '" << executorId
@@ -3624,7 +3626,7 @@ void Master::shutdown(
     return;
   }
 
-  Slave* slave = slaves.registered[shutdown.slave_id()];
+  Slave* slave = slaves.registered.get(shutdown.slave_id());
   CHECK_NOTNULL(slave);
 
   ShutdownExecutorMessage message;
@@ -3643,7 +3645,7 @@ void Master::shutdownSlave(const SlaveID& slaveId, const 
string& message)
     return;
   }
 
-  Slave* slave = slaves.registered[slaveId];
+  Slave* slave = slaves.registered.get(slaveId);
   CHECK_NOTNULL(slave);
 
   LOG(WARNING) << "Shutting down slave " << *slave << " with message '"
@@ -3919,7 +3921,8 @@ void Master::offer(const FrameworkID& frameworkId,
       continue;
     }
 
-    Slave* slave = slaves.registered[slaveId];
+    Slave* slave = slaves.registered.get(slaveId);
+    CHECK_NOTNULL(slave);
 
     CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
         << "Resources of non checkpointing slave " << *slave
@@ -4599,7 +4602,7 @@ void Master::addSlave(
   CHECK_NOTNULL(slave);
 
   slaves.removed.erase(slave->id);
-  slaves.registered[slave->id] = slave;
+  slaves.registered.put(slave);
 
   link(slave->pid);
 
@@ -4734,7 +4737,7 @@ void Master::removeSlave(
 
   // Mark the slave as being removed.
   slaves.removing.insert(slave->id);
-  slaves.registered.erase(slave->id);
+  slaves.registered.remove(slave);
   slaves.removed.put(slave->id, Nothing());
   authenticated.erase(slave->pid);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0922a7c..4a94e23 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1154,7 +1154,70 @@ private:
     // these slaves until the registrar determines their fate.
     hashset<SlaveID> reregistering;
 
-    hashmap<SlaveID, Slave*> registered;
+    // Registered slaves are indexed by SlaveID and UPID. Note that
+    // iteration is supported but is exposed as iteration over a
+    // hashmap<SlaveID, Slave*> since it is tedious to convert
+    // the map's key/value iterator into a value iterator.
+    //
+    // TODO(bmahler): Consider pulling in boost's multi_index,
+    // or creating a simpler indexing abstraction in stout.
+    struct
+    {
+      bool contains(const SlaveID& slaveId) const
+      {
+        return ids.contains(slaveId);
+      }
+
+      bool contains(const process::UPID& pid) const
+      {
+        return pids.contains(pid);
+      }
+
+      Slave* get(const SlaveID& slaveId) const
+      {
+        return ids.get(slaveId).get(NULL);
+      }
+
+      Slave* get(const process::UPID& pid) const
+      {
+        return pids.get(pid).get(NULL);
+      }
+
+      void put(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids[slave->id] = slave;
+        pids[slave->pid] = slave;
+      }
+
+      void remove(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids.erase(slave->id);
+        pids.erase(slave->pid);
+      }
+
+      void clear()
+      {
+        ids.clear();
+        pids.clear();
+      }
+
+      size_t size() const { return ids.size(); }
+
+      typedef hashmap<SlaveID, Slave*>::iterator iterator;
+      typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
+
+      iterator begin() { return ids.begin(); }
+      iterator end()   { return ids.end();   }
+
+      const_iterator begin() const { return ids.begin(); }
+      const_iterator end()   const { return ids.end();   }
+
+    private:
+      hashmap<SlaveID, Slave*> ids;
+      hashmap<process::UPID, Slave*> pids;
+    } registered;
 
     // Slaves that are in the process of being removed from the
     // registrar. Think of these as being partially removed: we must

Reply via email to