Repository: mesos Updated Branches: refs/heads/master 26091f461 -> c24268f13
Index slaves by UPID in the master. Review: https://reviews.apache.org/r/34388 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af Branch: refs/heads/master Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf Parents: b19ffd2 Author: Benjamin Mahler <[email protected]> Authored: Mon May 18 18:37:11 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue May 19 11:55:30 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 127 +++++++++++++++++++++++---------------------- src/master/master.hpp | 65 ++++++++++++++++++++++- 2 files changed, 129 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index eaea79d..d2df99c 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -973,7 +973,9 @@ void Master::exited(const UPID& pid) } } - // The semantics when a slave gets disconnected are as follows: + // The semantics when a registered slave gets disconnected are as + // follows: + // // 1) If the slave is not checkpointing, the slave is immediately // removed and all tasks running on it are transitioned to LOST. // No resources are recovered, because the slave is removed. @@ -985,42 +987,42 @@ void Master::exited(const UPID& pid) // 2.2) Framework is not-checkpointing: The slave is not removed // but the framework is removed from the slave's structs, // its tasks transitioned to LOST and resources recovered. - foreachvalue (Slave* slave, slaves.registered) { - if (slave->pid == pid) { - LOG(INFO) << "Slave " << *slave << " disconnected"; - - if (!slave->info.checkpoint()) { - // Remove the slave, if it is not checkpointing. - LOG(INFO) << "Removing disconnected slave " << *slave - << " because it is not checkpointing!"; - removeSlave(slave, - "slave is non-checkpointing and disconnected"); - return; - } else if (slave->connected) { - // Checkpointing slaves can just be disconnected. - disconnect(slave); + if (slaves.registered.contains(pid)) { + Slave* slave = slaves.registered.get(pid); + CHECK_NOTNULL(slave); + + LOG(INFO) << "Slave " << *slave << " disconnected"; + + if (!slave->info.checkpoint()) { + // Remove the slave, if it is not checkpointing. + LOG(INFO) << "Removing disconnected slave " << *slave + << " because it is not checkpointing!"; + removeSlave(slave, "slave is non-checkpointing and disconnected"); + return; + } else if (slave->connected) { + // Checkpointing slaves can just be disconnected. + disconnect(slave); - // Remove all non-checkpointing frameworks. - hashset<FrameworkID> frameworkIds = + // Remove all non-checkpointing frameworks. + hashset<FrameworkID> frameworkIds = slave->tasks.keys() | slave->executors.keys(); - foreach (const FrameworkID& frameworkId, frameworkIds) { - Framework* framework = getFramework(frameworkId); - if (framework != NULL && !framework->info.checkpoint()) { - LOG(INFO) << "Removing framework " << *framework - << " from disconnected slave " << *slave - << " because the framework is not checkpointing"; + foreach (const FrameworkID& frameworkId, frameworkIds) { + Framework* framework = getFramework(frameworkId); + if (framework != NULL && !framework->info.checkpoint()) { + LOG(INFO) << "Removing framework " << *framework + << " from disconnected slave " << *slave + << " because the framework is not checkpointing"; - removeFramework(slave, framework); - } + removeFramework(slave, framework); } - } else { - // NOTE: A duplicate exited() event is possible for a slave - // because its PID doesn't change on restart. See MESOS-675 - // for details. - LOG(WARNING) << "Ignoring duplicate exited() notification for " - << "checkpointing slave " << *slave; } + } else { + // NOTE: A duplicate exited() event is possible for a slave + // because its PID doesn't change on restart. See MESOS-675 + // for details. + LOG(WARNING) << "Ignoring duplicate exited() notification for " + << "checkpointing slave " << *slave; } } } @@ -3094,31 +3096,30 @@ void Master::registerSlave( } // Check if this slave is already registered (because it retries). - foreachvalue (Slave* slave, slaves.registered) { - if (slave->pid == from) { - if (!slave->connected) { - // The slave was previously disconnected but it is now trying - // to register as a new slave. This could happen if the slave - // failed recovery and hence registering as a new slave before - // the master removed the old slave from its map. - LOG(INFO) - << "Removing old disconnected slave " << *slave - << " because a registration attempt is being made from " << from; - removeSlave(slave, - "a new slave registered at the same address", - metrics->slave_removals_reason_registered); - break; - } else { - CHECK(slave->active) - << "Unexpected connected but deactivated slave " << *slave; - - LOG(INFO) << "Slave " << *slave << " already registered," - << " resending acknowledgement"; - SlaveRegisteredMessage message; - message.mutable_slave_id()->MergeFrom(slave->id); - send(from, message); - return; - } + if (slaves.registered.contains(from)) { + Slave* slave = slaves.registered.get(from); + CHECK_NOTNULL(slave); + + if (!slave->connected) { + // The slave was previously disconnected but it is now trying + // to register as a new slave. This could happen if the slave + // failed recovery and hence registering as a new slave before + // the master removed the old slave from its map. + LOG(INFO) << "Removing old disconnected slave " << *slave + << " because a registration attempt occurred"; + removeSlave(slave, + "a new slave registered at the same address", + metrics->slave_removals_reason_registered); + } else { + CHECK(slave->active) + << "Unexpected connected but deactivated slave " << *slave; + + LOG(INFO) << "Slave " << *slave << " already registered," + << " resending acknowledgement"; + SlaveRegisteredMessage message; + message.mutable_slave_id()->MergeFrom(slave->id); + send(from, message); + return; } } @@ -3574,7 +3575,8 @@ void Master::exitedExecutor( return; } - Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]); + Slave* slave = slaves.registered.get(slaveId); + CHECK_NOTNULL(slave); if (!slave->hasExecutor(frameworkId, executorId)) { LOG(WARNING) << "Ignoring unknown exited executor '" << executorId @@ -3624,7 +3626,7 @@ void Master::shutdown( return; } - Slave* slave = slaves.registered[shutdown.slave_id()]; + Slave* slave = slaves.registered.get(shutdown.slave_id()); CHECK_NOTNULL(slave); ShutdownExecutorMessage message; @@ -3643,7 +3645,7 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message) return; } - Slave* slave = slaves.registered[slaveId]; + Slave* slave = slaves.registered.get(slaveId); CHECK_NOTNULL(slave); LOG(WARNING) << "Shutting down slave " << *slave << " with message '" @@ -3919,7 +3921,8 @@ void Master::offer(const FrameworkID& frameworkId, continue; } - Slave* slave = slaves.registered[slaveId]; + Slave* slave = slaves.registered.get(slaveId); + CHECK_NOTNULL(slave); CHECK(slave->info.checkpoint() || !framework->info.checkpoint()) << "Resources of non checkpointing slave " << *slave @@ -4599,7 +4602,7 @@ void Master::addSlave( CHECK_NOTNULL(slave); slaves.removed.erase(slave->id); - slaves.registered[slave->id] = slave; + slaves.registered.put(slave); link(slave->pid); @@ -4734,7 +4737,7 @@ void Master::removeSlave( // Mark the slave as being removed. slaves.removing.insert(slave->id); - slaves.registered.erase(slave->id); + slaves.registered.remove(slave); slaves.removed.put(slave->id, Nothing()); authenticated.erase(slave->pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 0922a7c..4a94e23 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1154,7 +1154,70 @@ private: // these slaves until the registrar determines their fate. hashset<SlaveID> reregistering; - hashmap<SlaveID, Slave*> registered; + // Registered slaves are indexed by SlaveID and UPID. Note that + // iteration is supported but is exposed as iteration over a + // hashmap<SlaveID, Slave*> since it is tedious to convert + // the map's key/value iterator into a value iterator. + // + // TODO(bmahler): Consider pulling in boost's multi_index, + // or creating a simpler indexing abstraction in stout. + struct + { + bool contains(const SlaveID& slaveId) const + { + return ids.contains(slaveId); + } + + bool contains(const process::UPID& pid) const + { + return pids.contains(pid); + } + + Slave* get(const SlaveID& slaveId) const + { + return ids.get(slaveId).get(NULL); + } + + Slave* get(const process::UPID& pid) const + { + return pids.get(pid).get(NULL); + } + + void put(Slave* slave) + { + CHECK_NOTNULL(slave); + ids[slave->id] = slave; + pids[slave->pid] = slave; + } + + void remove(Slave* slave) + { + CHECK_NOTNULL(slave); + ids.erase(slave->id); + pids.erase(slave->pid); + } + + void clear() + { + ids.clear(); + pids.clear(); + } + + size_t size() const { return ids.size(); } + + typedef hashmap<SlaveID, Slave*>::iterator iterator; + typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator; + + iterator begin() { return ids.begin(); } + iterator end() { return ids.end(); } + + const_iterator begin() const { return ids.begin(); } + const_iterator end() const { return ids.end(); } + + private: + hashmap<SlaveID, Slave*> ids; + hashmap<process::UPID, Slave*> pids; + } registered; // Slaves that are in the process of being removed from the // registrar. Think of these as being partially removed: we must
