Store MasterInfo instead of UPID in the scheduler driver. Review: https://reviews.apache.org/r/36562
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e0ed711b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e0ed711b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e0ed711b Branch: refs/heads/master Commit: e0ed711bf339907807131db71db54550cf9a3424 Parents: 866147a Author: Benjamin Mahler <[email protected]> Authored: Thu Jul 16 15:24:47 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 17 13:36:56 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 84 +++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e0ed711b/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 8163796..fc33d24 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -232,7 +232,7 @@ protected: } if (_master.get().isSome()) { - master = UPID(_master.get().get().pid()); + master = _master.get().get(); } else { master = None(); } @@ -257,8 +257,8 @@ protected: connected = false; if (master.isSome()) { - LOG(INFO) << "New master detected at " << master.get(); - link(master.get()); + LOG(INFO) << "New master detected at " << master.get().pid(); + link(master.get().pid()); if (credential.isSome()) { // Authenticate with the master. @@ -313,7 +313,7 @@ protected: return; } - LOG(INFO) << "Authenticating with master " << master.get(); + LOG(INFO) << "Authenticating with master " << master.get().pid(); CHECK_SOME(credential); @@ -347,7 +347,7 @@ protected: // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'. // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade. authenticating = - authenticatee->authenticate(master.get(), self(), credential.get()) + authenticatee->authenticate(master.get().pid(), self(), credential.get()) .onAny(defer(self(), &Self::_authenticate)); delay(Seconds(5), @@ -383,7 +383,7 @@ protected: if (reauthenticate || !future.isReady()) { LOG(INFO) - << "Failed to authenticate with master " << master.get() << ": " + << "Failed to authenticate with master " << master.get().pid() << ": " << (reauthenticate ? "master changed" : (future.isFailed() ? future.failure() : "future discarded")); @@ -396,12 +396,14 @@ protected: } if (!future.get()) { - LOG(ERROR) << "Master " << master.get() << " refused authentication"; + LOG(ERROR) << "Master " << master.get().pid() + << " refused authentication"; error("Master refused authentication"); return; } - LOG(INFO) << "Successfully authenticated with master " << master.get(); + LOG(INFO) << "Successfully authenticated with master " + << master.get().pid(); authenticated = true; authenticating = None(); @@ -581,11 +583,11 @@ protected: return; } - if (master != from) { + if (master.isNone() || from != master.get().pid()) { LOG(WARNING) << "Ignoring framework registered message because it was sent " << "from '" << from << "' instead of the leading master '" - << (master.isSome() ? master.get() : UPID()) << "'"; + << (master.isSome() ? UPID(master.get().pid()) : UPID()) << "'"; return; } @@ -623,11 +625,11 @@ protected: return; } - if (master != from) { + if (master.isNone() || from != master.get().pid()) { LOG(WARNING) << "Ignoring framework re-registered message because it was sent " << "from '" << from << "' instead of the leading master '" - << (master.isSome() ? master.get() : UPID()) << "'"; + << (master.isSome() ? UPID(master.get().pid()) : UPID()) << "'"; return; } @@ -662,19 +664,19 @@ protected: return; } - VLOG(1) << "Sending registration request to " << master.get(); + VLOG(1) << "Sending registration request to " << master.get().pid(); if (!framework.has_id() || framework.id() == "") { // Touched for the very first time. RegisterFrameworkMessage message; message.mutable_framework()->MergeFrom(framework); - send(master.get(), message); + send(master.get().pid(), message); } else { // Not the first time, or failing over. ReregisterFrameworkMessage message; message.mutable_framework()->MergeFrom(framework); message.set_failover(failover); - send(master.get(), message); + send(master.get().pid(), message); } // Bound the maximum backoff by 'REGISTRATION_RETRY_INTERVAL_MAX'. @@ -721,10 +723,10 @@ protected: CHECK_SOME(master); - if (from != master.get()) { + if (from != master.get().pid()) { VLOG(1) << "Ignoring resource offers message because it was sent " << "from '" << from << "' instead of the leading master '" - << master.get() << "'"; + << master.get().pid() << "'"; return; } @@ -771,10 +773,10 @@ protected: CHECK_SOME(master); - if (from != master.get()) { + if (from != master.get().pid()) { VLOG(1) << "Ignoring rescind offer message because it was sent " << "from '" << from << "' instead of the leading master '" - << master.get() << "'"; + << master.get().pid() << "'"; return; } @@ -813,10 +815,10 @@ protected: CHECK_SOME(master); - if (from != master.get()) { + if (from != master.get().pid()) { VLOG(1) << "Ignoring status update message because it was sent " << "from '" << from << "' instead of the leading master '" - << master.get() << "'"; + << master.get().pid() << "'"; return; } } @@ -881,7 +883,7 @@ protected: CHECK_SOME(master); VLOG(2) << "Sending ACK for status update " << update - << " to " << master.get(); + << " to " << master.get().pid(); Call call; @@ -895,7 +897,7 @@ protected: acknowledge->set_uuid(update.uuid()); CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } } } @@ -916,10 +918,10 @@ protected: CHECK_SOME(master); - if (from != master.get()) { + if (from != master.get().pid()) { VLOG(1) << "Ignoring lost slave message because it was sent " << "from '" << from << "' instead of the leading master '" - << master.get() << "'"; + << master.get().pid() << "'"; return; } @@ -997,7 +999,7 @@ protected: call.set_type(Call::TEARDOWN); CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } synchronized (mutex) { @@ -1023,7 +1025,7 @@ protected: DeactivateFrameworkMessage message; message.mutable_framework_id()->MergeFrom(framework.id()); CHECK_SOME(master); - send(master.get(), message); + send(master.get().pid(), message); } synchronized (mutex) { @@ -1048,7 +1050,7 @@ protected: kill->mutable_task_id()->CopyFrom(taskId); CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } void requestResources(const vector<Request>& requests) @@ -1064,7 +1066,7 @@ protected: message.add_requests()->MergeFrom(request); } CHECK_SOME(master); - send(master.get(), message); + send(master.get().pid(), message); } void launchTasks(const vector<OfferID>& offerIds, @@ -1117,12 +1119,12 @@ protected: return; } - Call message; + Call call; CHECK(framework.has_id()); - message.mutable_framework_id()->CopyFrom(framework.id()); - message.set_type(Call::ACCEPT); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::ACCEPT); - Call::Accept* accept = message.mutable_accept(); + Call::Accept* accept = call.mutable_accept(); // Setting accept.operations. foreach (const Offer::Operation& _operation, operations) { @@ -1168,7 +1170,7 @@ protected: accept->mutable_filters()->CopyFrom(filters); CHECK_SOME(master); - send(master.get(), message); + send(master.get().pid(), call); } void reviveOffers() @@ -1185,7 +1187,7 @@ protected: call.set_type(Call::REVIVE); CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } void acknowledgeStatusUpdate( @@ -1218,7 +1220,7 @@ protected: VLOG(2) << "Sending ACK for status update " << status.uuid() << " of task " << status.task_id() << " on slave " << status.slave_id() - << " to " << master.get(); + << " to " << master.get().pid(); Call call; @@ -1231,7 +1233,7 @@ protected: acknowledge->mutable_task_id()->CopyFrom(status.task_id()); acknowledge->set_uuid(status.uuid()); - send(master.get(), call); + send(master.get().pid(), call); } else { VLOG(2) << "Received ACK for status update" << (status.has_uuid() ? " " + status.uuid() : "") @@ -1287,7 +1289,7 @@ protected: message->set_data(data); CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } } @@ -1315,7 +1317,7 @@ protected: } CHECK_SOME(master); - send(master.get(), call); + send(master.get().pid(), call); } private: @@ -1366,8 +1368,10 @@ private: FrameworkInfo framework; pthread_mutex_t* mutex; pthread_cond_t* cond; + bool failover; - Option<UPID> master; + + Option<MasterInfo> master; bool connected; // Flag to indicate if framework is registered.
