Repository: mesos Updated Branches: refs/heads/master a8130d3c2 -> bef49064a
Updated semantics of disconnected/deactivated slaves/frameworks in master. Review: https://reviews.apache.org/r/25866 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0b66d1da Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0b66d1da Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0b66d1da Branch: refs/heads/master Commit: 0b66d1da5ec9ce29dd324b38c5dd675fad4b294f Parents: a8130d3 Author: Vinod Kone <[email protected]> Authored: Thu Sep 18 10:15:20 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Sep 25 13:54:19 2014 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 2 +- src/master/master.cpp | 293 ++++++++++++++++++++++--------- src/master/master.hpp | 49 ++++-- src/tests/fault_tolerance_tests.cpp | 8 - src/tests/master_tests.cpp | 4 + 5 files changed, 244 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0b66d1da/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 3f5a01d..41d91c8 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -366,7 +366,7 @@ Future<Response> Master::Http::stats(const Request& request) object.values["uptime"] = (Clock::now() - master->startTime).secs(); object.values["elected"] = master->elected() ? 1 : 0; object.values["total_schedulers"] = master->frameworks.registered.size(); - object.values["active_schedulers"] = master->getActiveFrameworks().size(); + object.values["active_schedulers"] = master->_frameworks_active(); object.values["activated_slaves"] = master->_slaves_active(); object.values["deactivated_slaves"] = master->_slaves_inactive(); object.values["outstanding_offers"] = master->offers.size(); http://git-wip-us.apache.org/repos/asf/mesos/blob/0b66d1da/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e5d30e9..92d93fe 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -740,8 +740,8 @@ void Master::exited(const UPID& pid) if (framework->pid == pid) { LOG(INFO) << "Framework " << framework->id << " disconnected"; - // Deactivate framework. - deactivate(framework); + // Disconnect the framework. + disconnect(framework); // Set 'failoverTimeout' to the default and update only if the // input is valid. @@ -796,7 +796,7 @@ void Master::exited(const UPID& pid) << " because it is not checkpointing!"; removeSlave(slave); return; - } else if (!slave->disconnected) { + } else if (slave->connected) { // Checkpointing slaves can just be disconnected. disconnect(slave); @@ -815,6 +815,9 @@ void Master::exited(const UPID& pid) } } } else { + // NOTE: A duplicate exited() event is possible for a slave + // because its PID doesn't change on restart. See MESOS-675 + // for details. LOG(WARNING) << "Ignoring duplicate exited() notification for " << "checkpointing slave " << *slave; } @@ -1545,16 +1548,6 @@ void Master::_reregisterFramework( LOG(INFO) << "Allowing the Framework " << frameworkInfo.id() << " to re-register with an already used id"; - // Make sure we can get offers again. - // The framework might have been deactivated if it was doing - // authentication. - // TODO(vinod): Consider adding 'Master::activate(Framework*)'. - // TODO(vinod): Do this after we recover resources below. - if (!framework->active) { - framework->active = true; - allocator->frameworkActivated(framework->id, framework->info); - } - // Remove any offers sent to this framework. // NOTE: We need to do this because the scheduler might have // replied to the offers but the driver might have dropped @@ -1568,6 +1561,16 @@ void Master::_reregisterFramework( removeOffer(offer, true); // Rescind. } + framework->connected = true; + + // Reactivate the framework. + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. + if (!framework->active) { + framework->active = true; + allocator->frameworkActivated(framework->id, framework->info); + } + FrameworkReregisteredMessage message; message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); message.mutable_master_info()->MergeFrom(info_); @@ -1669,6 +1672,22 @@ void Master::deactivateFramework( } +void Master::disconnect(Framework* framework) +{ + CHECK_NOTNULL(framework); + + LOG(INFO) << "Disconnecting framework " << framework->id; + + framework->connected = false; + + // Remove the framework from authenticated. This is safe because + // a framework will always reauthenticate before (re-)registering. + authenticated.erase(framework->pid); + + deactivate(framework); +} + + void Master::deactivate(Framework* framework) { CHECK_NOTNULL(framework); @@ -1681,10 +1700,6 @@ void Master::deactivate(Framework* framework) // Tell the allocator to stop allocating resources to this framework. allocator->frameworkDeactivated(framework->id); - // Remove the framework from authenticated. This is safe because - // a framework will always reauthenticate before (re-)registering. - authenticated.erase(framework->pid); - // Remove the framework's offers. foreach (Offer* offer, utils::copy(framework->offers)) { allocator->resourcesRecovered( @@ -1698,16 +1713,28 @@ void Master::disconnect(Slave* slave) { CHECK_NOTNULL(slave); - LOG(INFO) << "Disconnecting slave " << slave->id; + LOG(INFO) << "Disconnecting slave " << *slave; - // Mark the slave as disconnected and deactivate it in the allocator. - slave->disconnected = true; - allocator->slaveDeactivated(slave->id); + slave->connected = false; // Remove the slave from authenticated. This is safe because // a slave will always reauthenticate before (re-)registering. authenticated.erase(slave->pid); + deactivate(slave); +} + + +void Master::deactivate(Slave* slave) +{ + CHECK_NOTNULL(slave); + + LOG(INFO) << "Deactivating slave " << *slave; + + slave->active = false; + + allocator->slaveDeactivated(slave->id); + // Remove and rescind offers. foreach (Offer* offer, utils::copy(slave->offers)) { allocator->resourcesRecovered( @@ -2078,9 +2105,9 @@ struct SlaveChecker : OfferVisitor << " outlived slave " << offer->slave_id(); // This is not possible because the offer should've been removed. - CHECK(!slave->disconnected) + CHECK(slave->connected) << "Offer " << offerId - << " outlived disconnected slave " << offer->slave_id(); + << " outlived disconnected slave " << *slave; if (slaveId.isNone()) { // Set slave id and use as base case for validation. @@ -2325,7 +2352,8 @@ void Master::launchTask( { CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - CHECK(!slave->disconnected); + CHECK(slave->connected) << "Launching task " << task.task_id() + << " on disconnected slave " << *slave; // Determine if this task launches an executor, and if so make sure // the slave and framework state has been updated accordingly. @@ -2403,7 +2431,7 @@ void Master::_launchTasks( } Slave* slave = getSlave(slaveId); - if (slave == NULL || slave->disconnected) { + if (slave == NULL || !slave->connected) { foreach (const TaskInfo& task, tasks) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id, @@ -2636,7 +2664,7 @@ void Master::killTask( // NOTE: This task will be properly reconciled when the // disconnected slave re-registers with the master. - if (!slave->disconnected) { + if (slave->connected) { LOG(INFO) << "Telling slave " << *slave << " to kill task " << taskId << " of framework " << frameworkId; @@ -2645,6 +2673,11 @@ void Master::killTask( message.mutable_framework_id()->MergeFrom(frameworkId); message.mutable_task_id()->MergeFrom(taskId); send(slave->pid, message); + } else { + LOG(WARNING) << "Cannot kill task " << taskId + << " of framework " << frameworkId + << " because the slave " << *slave << " is disconnected." + << " Kill will be retried if the slave re-registers"; } } @@ -2697,7 +2730,7 @@ void Master::statusUpdateAcknowledgement( return; } - if (slave->disconnected) { + if (!slave->connected) { LOG(WARNING) << "Cannot send status update acknowledgement message for task " << taskId << " of framework " << frameworkId << " to slave " << *slave @@ -2775,7 +2808,7 @@ void Master::schedulerMessage( return; } - if (slave->disconnected) { + if (!slave->connected) { LOG(WARNING) << "Cannot send framework message for framework " << frameworkId << " to slave " << *slave << " because slave is disconnected"; @@ -2827,7 +2860,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo) // Check if this slave is already registered (because it retries). foreachvalue (Slave* slave, slaves.registered) { if (slave->pid == from) { - if (slave->disconnected) { + if (!slave->connected) { // The slave was previously disconnected but it is now trying // to register as a new slave. This could happen if the slave // failed recovery and hence registering as a new slave before @@ -2838,6 +2871,9 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo) removeSlave(slave); break; } else { + CHECK(slave->active) + << "Unexpected connected but deactivated slave " << *slave; + LOG(INFO) << "Slave " << *slave << " already registered," << " resending acknowledgement"; SlaveRegisteredMessage message; @@ -3012,11 +3048,15 @@ void Master::reregisterSlave( // This is done after reconciliation to ensure the allocator's // offers include the recovered resources initially on this // slave. - if (slave->disconnected) { - slave->disconnected = false; // Reset the flag. + if (!slave->connected) { + slave->connected = true; + slave->active = true; allocator->slaveActivated(slave->id); } + CHECK(slave->active) + << "Unexpected connected but deactivated slave " << *slave; + // Inform the slave of the new framework pids for its tasks. __reregisterSlave(slave, tasks); @@ -3472,7 +3512,7 @@ void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId, { Framework* framework = getFramework(frameworkId); - if (framework != NULL && !framework->active) { + if (framework != NULL && !framework->connected) { // If the re-registration time has not changed, then the framework // has not re-registered within the failover timeout. if (framework->reregisteredTime == reregisteredTime) { @@ -3520,10 +3560,11 @@ void Master::offer(const FrameworkID& frameworkId, << " are being offered to checkpointing framework " << frameworkId; // This could happen if the allocator dispatched 'Master::offer' before - // it received 'Allocator::slaveRemoved' from the master. - if (slave->disconnected) { - LOG(WARNING) << "Master returning resources offered because slave " - << *slave << " is disconnected"; + // the slave was deactivated in the allocator. + if (!slave->active) { + LOG(WARNING) + << "Master returning resources offered because slave " << *slave + << " is " << (slave->connected ? "deactivated" : "disconnected"); allocator->resourcesRecovered(frameworkId, slaveId, offered, None()); continue; @@ -3621,25 +3662,29 @@ void Master::authenticate(const UPID& from, const UPID& pid) { ++metrics.messages_authenticate; - // Deactivate the framework/slave if it's already registered. - // TODO(adam-mesos): MESOS-1081: Do not deactivate the current - // framework/slave before we find out if the new one is legit. - bool found = false; - foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == pid) { - deactivate(framework); - found = true; - break; - } - } - if (!found) { - foreachvalue (Slave* slave, slaves.registered) { - if (slave->pid == pid) { - disconnect(slave); - break; - } - } - } + // An authentication request is sent by a client (slave/framework) + // in the following cases: + // + // 1. First time the client is connecting. + // This is straightforward; just proceed with authentication. + // + // 2. Client retried because of ZK expiration / authentication timeout. + // If the client is already authenticated, it will be removed from + // the 'authenticated' map and authentication is retried. + // + // 3. Client restarted. + // 3.1. We are here after receiving 'exited()' from old client. + // This is safe because the client will be first marked as + // disconnected and then when it re-registers it will be + // marked as connected. + // + // 3.2. We are here before receiving 'exited()' from old client. + // This is tricky only if the PID of the client doesn't change + // after restart; true for slave but not for framework. + // If the PID doesn't change the master might mark the client + // disconnected *after* the client re-registers. + // TODO(vinod): To ensure safety the client (slave) should be + // informed about this discrepancy so that it can re-register. authenticated.erase(pid); @@ -3721,19 +3766,6 @@ void Master::authenticationTimeout(Future<Option<string> > future) } -// Return connected frameworks that are not in the process of being removed. -vector<Framework*> Master::getActiveFrameworks() const -{ - vector <Framework*> result; - foreachvalue (Framework* framework, frameworks.registered) { - if (framework->active) { - result.push_back(framework); - } - } - return result; -} - - // NOTE: This function is only called when the slave re-registers // with a master that already knows about it (i.e., not a failed // over master). @@ -3935,21 +3967,12 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) framework->pid = newPid; link(newPid); - // Make sure we can get offers again. - // TODO(vinod): Do this after we recover resources below. - if (!framework->active) { - framework->active = true; - allocator->frameworkActivated(framework->id, framework->info); - } - // The scheduler driver safely ignores any duplicate registration // messages, so we don't need to compare the old and new pids here. - { - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); - message.mutable_master_info()->MergeFrom(info_); - send(newPid, message); - } + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_master_info()->MergeFrom(info_); + send(newPid, message); // Remove the framework's offers (if they weren't removed before). // We do this after we have updated the pid and sent the framework @@ -3961,6 +3984,16 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) removeOffer(offer); } + framework->connected = true; + + // Reactivate the framework. + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. + if (!framework->active) { + framework->active = true; + allocator->frameworkActivated(framework->id, framework->info); + } + // 'Failover' the framework's metrics. i.e., change the lookup key // for its metrics to 'newPid'. if (oldPid != newPid && frameworks.principals.contains(oldPid)) { @@ -4575,7 +4608,7 @@ double Master::_slaves_active() { double count = 0.0; foreachvalue (Slave* slave, slaves.registered) { - if (!slave->disconnected) { + if (slave->active) { count++; } } @@ -4583,13 +4616,83 @@ double Master::_slaves_active() } -// TODO(alexandra.sava): Count also the slaves that have been -// deactivated via HTTP POSTS, once MESOS-1476 will be committed. double Master::_slaves_inactive() { double count = 0.0; foreachvalue (Slave* slave, slaves.registered) { - if (slave->disconnected) { + if (!slave->active) { + count++; + } + } + return count; +} + + +double Master::_slaves_connected() +{ + double count = 0.0; + foreachvalue (Slave* slave, slaves.registered) { + if (slave->connected) { + count++; + } + } + return count; +} + + +double Master::_slaves_disconnected() +{ + double count = 0.0; + foreachvalue (Slave* slave, slaves.registered) { + if (!slave->connected) { + count++; + } + } + return count; +} + + +double Master::_frameworks_connected() +{ + double count = 0.0; + foreachvalue (Framework* framework, frameworks.registered) { + if (framework->connected) { + count++; + } + } + return count; +} + + +double Master::_frameworks_disconnected() +{ + double count = 0.0; + foreachvalue (Framework* framework, frameworks.registered) { + if (!framework->connected) { + count++; + } + } + return count; +} + + +double Master::_frameworks_active() +{ + double count = 0.0; + foreachvalue (Framework* framework, frameworks.registered) { + if (framework->active) { + count++; + } + } + return count; +} + + +double Master::_frameworks_inactive() +{ + double count = 0.0; + foreachvalue (Framework* framework, frameworks.registered) { + if (!framework->active) { count++; } } @@ -4670,12 +4773,24 @@ Master::Metrics::Metrics(const Master& master) elected( "master/elected", defer(master, &Master::_elected)), + slaves_connected( + "master/slaves_connected", + defer(master, &Master::_slaves_connected)), + slaves_disconnected( + "master/slaves_disconnected", + defer(master, &Master::_slaves_disconnected)), slaves_active( "master/slaves_active", defer(master, &Master::_slaves_active)), slaves_inactive( "master/slaves_inactive", defer(master, &Master::_slaves_inactive)), + frameworks_connected( + "master/frameworks_connected", + defer(master, &Master::_frameworks_connected)), + frameworks_disconnected( + "master/frameworks_disconnected", + defer(master, &Master::_frameworks_disconnected)), frameworks_active( "master/frameworks_active", defer(master, &Master::_frameworks_active)), @@ -4772,9 +4887,13 @@ Master::Metrics::Metrics(const Master& master) process::metrics::add(uptime_secs); process::metrics::add(elected); + process::metrics::add(slaves_connected); + process::metrics::add(slaves_disconnected); process::metrics::add(slaves_active); process::metrics::add(slaves_inactive); + process::metrics::add(frameworks_connected); + process::metrics::add(frameworks_disconnected); process::metrics::add(frameworks_active); process::metrics::add(frameworks_inactive); @@ -4865,9 +4984,13 @@ Master::Metrics::~Metrics() process::metrics::remove(uptime_secs); process::metrics::remove(elected); + process::metrics::remove(slaves_connected); + process::metrics::remove(slaves_disconnected); process::metrics::remove(slaves_active); process::metrics::remove(slaves_inactive); + process::metrics::remove(frameworks_connected); + process::metrics::remove(frameworks_disconnected); process::metrics::remove(frameworks_active); process::metrics::remove(frameworks_inactive); http://git-wip-us.apache.org/repos/asf/mesos/blob/0b66d1da/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index f5d74ae..d638019 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -286,9 +286,6 @@ protected: void fileAttached(const process::Future<Nothing>& result, const std::string& path); - // Return connected frameworks that are not in the process of being removed. - std::vector<Framework*> getActiveFrameworks() const; - // Invoked when the contender has entered the contest. void contended(const process::Future<process::Future<Nothing> >& candidacy); @@ -328,8 +325,11 @@ protected: // executors and recover the resources. void removeFramework(Slave* slave, Framework* framework); + void disconnect(Framework* framework); void deactivate(Framework* framework); + void disconnect(Slave* slave); + void deactivate(Slave* slave); // Add a slave. void addSlave(Slave* slave, bool reregister = false); @@ -606,9 +606,13 @@ private: process::metrics::Gauge uptime_secs; process::metrics::Gauge elected; + process::metrics::Gauge slaves_connected; + process::metrics::Gauge slaves_disconnected; process::metrics::Gauge slaves_active; process::metrics::Gauge slaves_inactive; + process::metrics::Gauge frameworks_connected; + process::metrics::Gauge frameworks_disconnected; process::metrics::Gauge frameworks_active; process::metrics::Gauge frameworks_inactive; @@ -730,19 +734,15 @@ private: return elected() ? 1 : 0; } + double _slaves_connected(); + double _slaves_disconnected(); double _slaves_active(); - double _slaves_inactive(); - double _frameworks_active() - { - return getActiveFrameworks().size(); - } - - double _frameworks_inactive() - { - return frameworks.registered.size() - _frameworks_active(); - } + double _frameworks_connected(); + double _frameworks_disconnected(); + double _frameworks_active(); + double _frameworks_inactive(); double _outstanding_offers() { @@ -821,7 +821,8 @@ struct Slave info(_info), pid(_pid), registeredTime(time), - disconnected(false), + connected(true), + active(true), observer(NULL) {} ~Slave() {} @@ -937,9 +938,13 @@ struct Slave process::Time registeredTime; Option<process::Time> reregisteredTime; - // We mark a slave 'disconnected' when it has checkpointing - // enabled because we expect it reregister after recovery. - bool disconnected; + // Slave becomes disconnected when the socket closes. + bool connected; + + // Slave becomes deactivated when it gets disconnected. In the + // future this might also happen via HTTP endpoint. + // No offers will be made for a deactivated slave. + bool active; // Executors running on this slave. hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors; @@ -984,6 +989,7 @@ struct Framework : id(_id), info(_info), pid(_pid), + connected(true), active(true), registeredTime(time), reregisteredTime(time), @@ -1101,7 +1107,14 @@ struct Framework process::UPID pid; - bool active; // Turns false when framework is being removed. + // Framework becomes disconnected when the socket closes. + bool connected; + + // Framework becomes deactivated when it is disconnected or + // the master receives a DeactivateFrameworkMessage. + // No offers will be made to a deactivated framework. + bool active; + process::Time registeredTime; process::Time reregisteredTime; process::Time unregisteredTime; http://git-wip-us.apache.org/repos/asf/mesos/blob/0b66d1da/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index 1543860..e8f5322 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -2029,10 +2029,6 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration) AWAIT_READY(resourceOffers); - Future<Nothing> offerRescinded; - EXPECT_CALL(sched, offerRescinded(_, _)) - .WillOnce(FutureSatisfy(&offerRescinded)); - Future<SlaveReregisteredMessage> slaveReregisteredMessage = FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); @@ -2040,10 +2036,6 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration) // expiration) at the slave. detector.appoint(master.get()); - // Since an authenticating slave re-registration results in - // disconnecting the slave, its resources should be rescinded. - AWAIT_READY(offerRescinded); - AWAIT_READY(slaveReregisteredMessage); driver.stop(); http://git-wip-us.apache.org/repos/asf/mesos/blob/0b66d1da/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 8e4ec1d..1497db2 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -1433,9 +1433,13 @@ TEST_F(MasterTest, MetricsInStatsEndpoint) EXPECT_EQ(1, stats.values["elected"]); EXPECT_EQ(1, stats.values["master/elected"]); + EXPECT_EQ(1u, stats.values.count("master/slaves_connected")); + EXPECT_EQ(1u, stats.values.count("master/slaves_disconnected")); EXPECT_EQ(1u, stats.values.count("master/slaves_active")); EXPECT_EQ(1u, stats.values.count("master/slaves_inactive")); + EXPECT_EQ(1u, stats.values.count("master/frameworks_connected")); + EXPECT_EQ(1u, stats.values.count("master/frameworks_disconnected")); EXPECT_EQ(1u, stats.values.count("master/frameworks_active")); EXPECT_EQ(1u, stats.values.count("master/frameworks_inactive"));
