Store MasterInfo instead of UPID in the scheduler driver.

Review: https://reviews.apache.org/r/36562


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e0ed711b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e0ed711b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e0ed711b

Branch: refs/heads/master
Commit: e0ed711bf339907807131db71db54550cf9a3424
Parents: 866147a
Author: Benjamin Mahler <[email protected]>
Authored: Thu Jul 16 15:24:47 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 17 13:36:56 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp | 84 +++++++++++++++++++++++++-----------------------
 1 file changed, 44 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e0ed711b/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 8163796..fc33d24 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -232,7 +232,7 @@ protected:
     }
 
     if (_master.get().isSome()) {
-      master = UPID(_master.get().get().pid());
+      master = _master.get().get();
     } else {
       master = None();
     }
@@ -257,8 +257,8 @@ protected:
     connected = false;
 
     if (master.isSome()) {
-      LOG(INFO) << "New master detected at " << master.get();
-      link(master.get());
+      LOG(INFO) << "New master detected at " << master.get().pid();
+      link(master.get().pid());
 
       if (credential.isSome()) {
         // Authenticate with the master.
@@ -313,7 +313,7 @@ protected:
       return;
     }
 
-    LOG(INFO) << "Authenticating with master " << master.get();
+    LOG(INFO) << "Authenticating with master " << master.get().pid();
 
     CHECK_SOME(credential);
 
@@ -347,7 +347,7 @@ protected:
     // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
     // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
     authenticating =
-      authenticatee->authenticate(master.get(), self(), credential.get())
+      authenticatee->authenticate(master.get().pid(), self(), credential.get())
         .onAny(defer(self(), &Self::_authenticate));
 
     delay(Seconds(5),
@@ -383,7 +383,7 @@ protected:
 
     if (reauthenticate || !future.isReady()) {
       LOG(INFO)
-        << "Failed to authenticate with master " << master.get() << ": "
+        << "Failed to authenticate with master " << master.get().pid() << ": "
         << (reauthenticate ? "master changed" :
            (future.isFailed() ? future.failure() : "future discarded"));
 
@@ -396,12 +396,14 @@ protected:
     }
 
     if (!future.get()) {
-      LOG(ERROR) << "Master " << master.get() << " refused authentication";
+      LOG(ERROR) << "Master " << master.get().pid()
+                 << " refused authentication";
       error("Master refused authentication");
       return;
     }
 
-    LOG(INFO) << "Successfully authenticated with master " << master.get();
+    LOG(INFO) << "Successfully authenticated with master "
+              << master.get().pid();
 
     authenticated = true;
     authenticating = None();
@@ -581,11 +583,11 @@ protected:
       return;
     }
 
-    if (master != from) {
+    if (master.isNone() || from != master.get().pid()) {
       LOG(WARNING)
         << "Ignoring framework registered message because it was sent "
         << "from '" << from << "' instead of the leading master '"
-        << (master.isSome() ? master.get() : UPID()) << "'";
+        << (master.isSome() ? UPID(master.get().pid()) : UPID()) << "'";
       return;
     }
 
@@ -623,11 +625,11 @@ protected:
       return;
     }
 
-    if (master != from) {
+    if (master.isNone() || from != master.get().pid()) {
       LOG(WARNING)
         << "Ignoring framework re-registered message because it was sent "
         << "from '" << from << "' instead of the leading master '"
-        << (master.isSome() ? master.get() : UPID()) << "'";
+        << (master.isSome() ? UPID(master.get().pid()) : UPID()) << "'";
       return;
     }
 
@@ -662,19 +664,19 @@ protected:
       return;
     }
 
-    VLOG(1) << "Sending registration request to " << master.get();
+    VLOG(1) << "Sending registration request to " << master.get().pid();
 
     if (!framework.has_id() || framework.id() == "") {
       // Touched for the very first time.
       RegisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
-      send(master.get(), message);
+      send(master.get().pid(), message);
     } else {
       // Not the first time, or failing over.
       ReregisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
       message.set_failover(failover);
-      send(master.get(), message);
+      send(master.get().pid(), message);
     }
 
     // Bound the maximum backoff by 'REGISTRATION_RETRY_INTERVAL_MAX'.
@@ -721,10 +723,10 @@ protected:
 
     CHECK_SOME(master);
 
-    if (from != master.get()) {
+    if (from != master.get().pid()) {
       VLOG(1) << "Ignoring resource offers message because it was sent "
               << "from '" << from << "' instead of the leading master '"
-              << master.get() << "'";
+              << master.get().pid() << "'";
       return;
     }
 
@@ -771,10 +773,10 @@ protected:
 
     CHECK_SOME(master);
 
-    if (from != master.get()) {
+    if (from != master.get().pid()) {
       VLOG(1) << "Ignoring rescind offer message because it was sent "
               << "from '" << from << "' instead of the leading master '"
-              << master.get() << "'";
+              << master.get().pid() << "'";
       return;
     }
 
@@ -813,10 +815,10 @@ protected:
 
       CHECK_SOME(master);
 
-      if (from != master.get()) {
+      if (from != master.get().pid()) {
         VLOG(1) << "Ignoring status update message because it was sent "
                 << "from '" << from << "' instead of the leading master '"
-                << master.get() << "'";
+                << master.get().pid() << "'";
         return;
       }
     }
@@ -881,7 +883,7 @@ protected:
         CHECK_SOME(master);
 
         VLOG(2) << "Sending ACK for status update " << update
-            << " to " << master.get();
+                << " to " << master.get().pid();
 
         Call call;
 
@@ -895,7 +897,7 @@ protected:
         acknowledge->set_uuid(update.uuid());
 
         CHECK_SOME(master);
-        send(master.get(), call);
+        send(master.get().pid(), call);
       }
     }
   }
@@ -916,10 +918,10 @@ protected:
 
     CHECK_SOME(master);
 
-    if (from != master.get()) {
+    if (from != master.get().pid()) {
       VLOG(1) << "Ignoring lost slave message because it was sent "
               << "from '" << from << "' instead of the leading master '"
-              << master.get() << "'";
+              << master.get().pid() << "'";
       return;
     }
 
@@ -997,7 +999,7 @@ protected:
       call.set_type(Call::TEARDOWN);
 
       CHECK_SOME(master);
-      send(master.get(), call);
+      send(master.get().pid(), call);
     }
 
     synchronized (mutex) {
@@ -1023,7 +1025,7 @@ protected:
       DeactivateFrameworkMessage message;
       message.mutable_framework_id()->MergeFrom(framework.id());
       CHECK_SOME(master);
-      send(master.get(), message);
+      send(master.get().pid(), message);
     }
 
     synchronized (mutex) {
@@ -1048,7 +1050,7 @@ protected:
     kill->mutable_task_id()->CopyFrom(taskId);
 
     CHECK_SOME(master);
-    send(master.get(), call);
+    send(master.get().pid(), call);
   }
 
   void requestResources(const vector<Request>& requests)
@@ -1064,7 +1066,7 @@ protected:
       message.add_requests()->MergeFrom(request);
     }
     CHECK_SOME(master);
-    send(master.get(), message);
+    send(master.get().pid(), message);
   }
 
   void launchTasks(const vector<OfferID>& offerIds,
@@ -1117,12 +1119,12 @@ protected:
       return;
     }
 
-    Call message;
+    Call call;
     CHECK(framework.has_id());
-    message.mutable_framework_id()->CopyFrom(framework.id());
-    message.set_type(Call::ACCEPT);
+    call.mutable_framework_id()->CopyFrom(framework.id());
+    call.set_type(Call::ACCEPT);
 
-    Call::Accept* accept = message.mutable_accept();
+    Call::Accept* accept = call.mutable_accept();
 
     // Setting accept.operations.
     foreach (const Offer::Operation& _operation, operations) {
@@ -1168,7 +1170,7 @@ protected:
     accept->mutable_filters()->CopyFrom(filters);
 
     CHECK_SOME(master);
-    send(master.get(), message);
+    send(master.get().pid(), call);
   }
 
   void reviveOffers()
@@ -1185,7 +1187,7 @@ protected:
     call.set_type(Call::REVIVE);
 
     CHECK_SOME(master);
-    send(master.get(), call);
+    send(master.get().pid(), call);
   }
 
   void acknowledgeStatusUpdate(
@@ -1218,7 +1220,7 @@ protected:
       VLOG(2) << "Sending ACK for status update " << status.uuid()
               << " of task " << status.task_id()
               << " on slave " << status.slave_id()
-              << " to " << master.get();
+              << " to " << master.get().pid();
 
       Call call;
 
@@ -1231,7 +1233,7 @@ protected:
       acknowledge->mutable_task_id()->CopyFrom(status.task_id());
       acknowledge->set_uuid(status.uuid());
 
-      send(master.get(), call);
+      send(master.get().pid(), call);
     } else {
       VLOG(2) << "Received ACK for status update"
               << (status.has_uuid() ? " " + status.uuid() : "")
@@ -1287,7 +1289,7 @@ protected:
       message->set_data(data);
 
       CHECK_SOME(master);
-      send(master.get(), call);
+      send(master.get().pid(), call);
     }
   }
 
@@ -1315,7 +1317,7 @@ protected:
     }
 
     CHECK_SOME(master);
-    send(master.get(), call);
+    send(master.get().pid(), call);
   }
 
 private:
@@ -1366,8 +1368,10 @@ private:
   FrameworkInfo framework;
   pthread_mutex_t* mutex;
   pthread_cond_t* cond;
+
   bool failover;
-  Option<UPID> master;
+
+  Option<MasterInfo> master;
 
   bool connected; // Flag to indicate if framework is registered.
 

Reply via email to