Repository: mesos
Updated Branches:
  refs/heads/master b810250fa -> c96ba8f60


Introduced Master <-> Slave reconciliation.

Review: https://reviews.apache.org/r/26206


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc4444eb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc4444eb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc4444eb

Branch: refs/heads/master
Commit: cc4444eb47705649a4e93a4045b4d130dd4b3354
Parents: b810250
Author: Benjamin Mahler <[email protected]>
Authored: Tue Sep 30 11:31:53 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Oct 8 18:09:04 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 95 +++++++++++++++++++-------------
 src/master/master.hpp               |  5 +-
 src/messages/messages.proto         |  2 +
 src/slave/slave.cpp                 | 79 +++++++++++++++++++++-----
 src/slave/slave.hpp                 |  7 ++-
 src/tests/fault_tolerance_tests.cpp | 12 +++-
 6 files changed, 142 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc48b96..cb46cec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3092,21 +3092,7 @@ void Master::reregisterSlave(
     // For now, we assume this slave is not nefarious (eventually
     // this will be handled by orthogonal security measures like key
     // based authentication).
-    LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
-                 << ") is being allowed to re-register with an already"
-                 << " in use id (" << slave->id << ")";
-
-    // TODO(bmahler): There's an implicit assumption here that when
-    // the master already knows about this slave, the slave cannot
-    // have tasks unknown to the master. This _should_ be the case
-    // since the causal relationship is:
-    //   slave removes task -> master removes task
-    // We should enforce this via a CHECK (dangerous), or by shutting
-    // down slaves that are found to violate this assumption.
-
-    SlaveReregisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(from, message);
+    LOG(INFO) << "Re-registering slave " << *slave;
 
     // Update the slave pid and relink to it.
     // NOTE: Re-linking the slave here always rather than only when
@@ -3119,8 +3105,8 @@ void Master::reregisterSlave(
     link(slave->pid);
 
     // Reconcile tasks between master and the slave.
-    // NOTE: This needs to be done after the registration message is
-    // sent to the slave and the new pid is linked.
+    // NOTE: This sends the re-registered message, including tasks
+    // that need to be reconciled by the slave.
     reconcile(slave, executorInfos, tasks);
 
     // If this is a disconnected slave, add it back to the allocator.
@@ -3871,44 +3857,79 @@ void Master::reconcile(
 {
   CHECK_NOTNULL(slave);
 
+  // TODO(bmahler): There's an implicit assumption here the slave
+  // cannot have tasks unknown to the master. This _should_ be the
+  // case since the causal relationship is:
+  //   slave removes task -> master removes task
+  // Add error logging for any violations of this assumption!
+
   // We convert the 'tasks' into a map for easier lookup below.
-  // TODO(vinod): Check if the tasks are known to the master.
   multihashmap<FrameworkID, TaskID> slaveTasks;
   foreach (const Task& task, tasks) {
     slaveTasks.put(task.framework_id(), task.task_id());
   }
 
-  // Send TASK_LOST updates for tasks present in the master but
-  // missing from the slave. This could happen if the task was
-  // dropped by the slave (e.g., slave exited before getting the
-  // task or the task was launched while slave was in recovery).
-  // NOTE: copies are needed because removeTask modifies slave->tasks.
+  // Look for tasks missing in the slave's re-registration message.
+  // This can occur when:
+  //   (1) a launch message was dropped (e.g. slave failed over), or
+  //   (2) the slave re-registration raced with a launch message, in
+  //       which case the slave actually received the task.
+  // To resolve both cases correctly, we must reconcile through the
+  // slave. For slaves that do not support reconciliation, we keep
+  // the old semantics and cover only case (1) via TASK_LOST.
+  SlaveReregisteredMessage reregistered;
+  reregistered.mutable_slave_id()->MergeFrom(slave->id);
+
+  // NOTE: copies are needed because removeTask modified slave->tasks.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    ReconcileTasksMessage reconcile;
+    reconcile.mutable_framework_id()->CopyFrom(frameworkId);
+
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
         LOG(WARNING) << "Task " << task->task_id()
                      << " of framework " << task->framework_id()
                      << " unknown to the slave " << *slave
-                     << " during re-registration";
-
-        const StatusUpdate& update = protobuf::createStatusUpdate(
-            task->framework_id(),
-            slave->id,
-            task->task_id(),
-            TASK_LOST,
-            "Task is unknown to the slave");
+                     << " during re-registration"
+                     << (slave->version.isSome()
+                         ? ": reconciling with the slave"
+                         : ": sending TASK_LOST");
+
+        if (slave->version.isSome()) {
+          TaskStatus* status = reconcile.add_statuses();
+          status->mutable_task_id()->CopyFrom(task->task_id());
+          status->mutable_slave_id()->CopyFrom(slave->id);
+          status->set_state(task->state());
+          status->set_message("Reconciliation request");
+          status->set_timestamp(Clock::now().secs());
+        } else {
+          // TODO(bmahler): Remove this case in 0.22.0.
+          const StatusUpdate& update = protobuf::createStatusUpdate(
+              task->framework_id(),
+              slave->id,
+              task->task_id(),
+              TASK_LOST,
+              "Task is unknown to the slave");
+
+          updateTask(task, update.status());
+          removeTask(task);
 
-        updateTask(task, update.status());
-        removeTask(task);
-
-        Framework* framework = getFramework(frameworkId);
-        if (framework != NULL) {
-          forward(update, UPID(), framework);
+          Framework* framework = getFramework(frameworkId);
+          if (framework != NULL) {
+            forward(update, UPID(), framework);
+          }
         }
       }
     }
+
+    if (slave->version.isSome() && reconcile.statuses_size() > 0) {
+      reregistered.add_reconciliations()->CopyFrom(reconcile);
+    }
   }
 
+  // Re-register the slave.
+  send(slave->pid, reregistered);
+
   // Likewise, any executors that are present in the master but
   // not present in the slave must be removed to correctly account
   // for resources. First we index the executors for fast lookup below.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 49589f4..14f1d0f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -292,9 +292,8 @@ protected:
   // Invoked when the contender has entered the contest.
   void contended(const process::Future<process::Future<Nothing> >& candidacy);
 
-  // Reconciles a re-registering slave's tasks / executors and sends
-  // TASK_LOST updates for tasks known to the master but unknown to
-  // the slave.
+  // Handles a known re-registering slave by reconciling the master's
+  // view of the slave's tasks and executors.
   void reconcile(
       Slave* slave,
       const std::vector<ExecutorInfo>& executors,

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index edf1e4e..77515d9 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -259,6 +259,8 @@ message SlaveRegisteredMessage {
 
 message SlaveReregisteredMessage {
   required SlaveID slave_id = 1;
+
+  repeated ReconcileTasksMessage reconciliations = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 809b008..cb37599 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -347,7 +347,8 @@ void Slave::initialize()
 
   install<SlaveReregisteredMessage>(
       &Slave::reregistered,
-      &SlaveReregisteredMessage::slave_id);
+      &SlaveReregisteredMessage::slave_id,
+      &SlaveReregisteredMessage::reconciliations);
 
   install<RunTaskMessage>(
       &Slave::runTask,
@@ -810,7 +811,10 @@ void Slave::registered(const UPID& from, const SlaveID& 
slaveId)
 }
 
 
-void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
+void Slave::reregistered(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const vector<ReconcileTasksMessage>& reconciliations)
 {
   if (master != from) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
@@ -823,25 +827,15 @@ void Slave::reregistered(const UPID& from, const SlaveID& 
slaveId)
     case DISCONNECTED:
       CHECK_SOME(master);
       LOG(INFO) << "Re-registered with master " << master.get();
-
       state = RUNNING;
-      if (!(info.id() == slaveId)) {
-        EXIT(1) << "Re-registered but got wrong id: " << slaveId
-                << "(expected: " << info.id() << "). Committing suicide";
-      }
       break;
     case RUNNING:
-      // Already re-registered!
-      if (!(info.id() == slaveId)) {
-        EXIT(1) << "Re-registered but got wrong id: " << slaveId
-                << "(expected: " << info.id() << "). Committing suicide";
-      }
       CHECK_SOME(master);
       LOG(WARNING) << "Already re-registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring re-registration because slave is terminating";
-      break;
+      return;
     case RECOVERING:
       // It's possible to receive a message intended for the previous
       // run of the slave here. Short term we can leave this as is and
@@ -851,7 +845,64 @@ void Slave::reregistered(const UPID& from, const SlaveID& 
slaveId)
       // https://issues.apache.org/jira/browse/MESOS-677
     default:
       LOG(FATAL) << "Unexpected slave state " << state;
-      break;
+      return;;
+  }
+
+  if (!(info.id() == slaveId)) {
+    EXIT(1) << "Re-registered but got wrong id: " << slaveId
+            << "(expected: " << info.id() << "). Committing suicide";
+  }
+
+  // Reconcile any tasks per the master's request.
+  foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
+    Framework* framework = getFramework(reconcile.framework_id());
+
+    foreach (const TaskStatus& status, reconcile.statuses()) {
+      const TaskID& taskId = status.task_id();
+
+      bool known = false;
+
+      // Try to locate the task.
+      if (framework != NULL) {
+        foreachkey (const ExecutorID& executorId, framework->pending) {
+          if (framework->pending[executorId].contains(taskId)) {
+            known = true;
+          }
+        }
+        foreachvalue (Executor* executor, framework->executors) {
+          if (executor->queuedTasks.contains(taskId) ||
+              executor->launchedTasks.contains(taskId) ||
+              executor->terminatedTasks.contains(taskId)) {
+            known = true;
+          }
+        }
+      }
+
+      // We only need to send a TASK_LOST update when the task is
+      // unknown (so that the master removes it). Otherwise, the
+      // master correctly holds the task and will receive updates.
+      if (!known) {
+        LOG(WARNING) << "Slave reconciling task " << taskId
+                     << " of framework " << reconcile.framework_id()
+                     << " in state TASK_LOST: task unknown to the slave";
+
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            reconcile.framework_id(),
+            info.id(),
+            taskId,
+            TASK_LOST,
+            "Reconciliation: task unknown to the slave");
+
+        // NOTE: We can't use statusUpdate() here because it drops
+        // updates for unknown frameworks.
+        statusUpdateManager->update(update, info.id())
+          .onAny(defer(self(),
+                       &Slave::__statusUpdate,
+                       lambda::_1,
+                       update,
+                       UPID()));
+      }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2869710..76d505c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -97,7 +97,12 @@ public:
   void shutdown(const process::UPID& from, const std::string& message);
 
   void registered(const process::UPID& from, const SlaveID& slaveId);
-  void reregistered(const process::UPID& from, const SlaveID& slaveId);
+
+  void reregistered(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const std::vector<ReconcileTasksMessage>& reconciliations);
+
   void doReliableRegistration(const Duration& duration);
 
   void runTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index e8f5322..c34a9d6 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -2233,9 +2233,9 @@ TEST_F(FaultToleranceTest, 
FrameworkReregisterEmptyExecutor)
 }
 
 
-// This test verifies that the master sends TASK_LOST updates
-// for tasks in the master absent from the re-registered slave.
-// We do this by dropping RunTaskMessage from master to the slave.
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we drop the
+// RunTaskMessage so the slave should send TASK_LOST.
 TEST_F(FaultToleranceTest, ReconcileLostTasks)
 {
   Try<PID<Master> > master = StartMaster();
@@ -2285,6 +2285,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
@@ -2295,6 +2298,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
 
   AWAIT_READY(slaveReregisteredMessage);
 
+  // Make sure the slave generated the TASK_LOST.
+  AWAIT_READY(statusUpdateMessage);
+
   AWAIT_READY(status);
 
   ASSERT_EQ(task.task_id(), status.get().task_id());

Reply via email to