Repository: mesos
Updated Branches:
  refs/heads/master 616d40185 -> e960cdffe


Updated slave re-registration to send unacknowledged task states.

Review: https://reviews.apache.org/r/26699


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca14f37b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca14f37b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca14f37b

Branch: refs/heads/master
Commit: ca14f37bf7321a977e297e974e9c4c1f0cc57e0e
Parents: 65c3c36
Author: Vinod Kone <[email protected]>
Authored: Fri Oct 10 12:16:32 2014 -0700
Committer: Vinod Kone <[email protected]>
Committed: Tue Oct 21 15:47:08 2014 -0700

----------------------------------------------------------------------
 src/messages/messages.proto         | 10 +++-
 src/slave/slave.cpp                 | 45 +++++++++++++++
 src/tests/fault_tolerance_tests.cpp |  3 -
 src/tests/slave_tests.cpp           | 99 ++++++++++++++++++++++++++++++++
 4 files changed, 153 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0dfc1b7..196d1d4 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -44,11 +44,18 @@ message Task {
   required FrameworkID framework_id = 3;
   optional ExecutorID executor_id = 4;
   required SlaveID slave_id = 5;
-  required TaskState state = 6;
+  required TaskState state = 6; // Latest state of the task.
   repeated Resource resources = 7;
   repeated TaskStatus statuses = 8;
+
+  // These fields correspond to the state and uuid of the latest
+  // status update forwarded to the master.
+  // NOTE: Either both the fields must be set or both must be unset.
+  optional TaskState status_update_state = 9;
+  optional bytes status_update_uuid = 10;
 }
 
+
 // Describes a role, which are used to group frameworks for allocation
 // decisions, depending on the allocation policy being used.
 // The weight field can be used to indicate forms of priority.
@@ -58,6 +65,7 @@ message RoleInfo {
 }
 
 
+// TODO(vinod): Create a new UUID message type.
 message StatusUpdate {
   required FrameworkID framework_id = 1;
   optional ExecutorID executor_id = 2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a98e408..a5761ed 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -957,12 +957,16 @@ void Slave::doReliableRegistration(const Duration& 
duration)
         // Add launched, terminated, and queued tasks.
         // Note that terminated executors will only have terminated
         // unacknowledged tasks.
+        // Note that for each task the latest state and status update
+        // state (if any) is also included.
         foreach (Task* task, executor->launchedTasks.values()) {
           message.add_tasks()->CopyFrom(*task);
         }
+
         foreach (Task* task, executor->terminatedTasks.values()) {
           message.add_tasks()->CopyFrom(*task);
         }
+
         foreach (const TaskInfo& task, executor->queuedTasks.values()) {
           message.add_tasks()->CopyFrom(protobuf::createTask(
               task, TASK_STAGING, framework->id));
@@ -2381,9 +2385,50 @@ void Slave::forward(const StatusUpdate& update)
     return;
   }
 
+  // Update the status update state of the task.
+  Framework* framework = getFramework(update.framework_id());
+  if (framework != NULL) {
+    const TaskID& taskId = update.status().task_id();
+    Executor* executor = framework->getExecutor(taskId);
+    if (executor != NULL) {
+      // NOTE: We do not look for the task in queued tasks because
+      // no update is expected for it until it's launched. Similarly,
+      // we do not look for completed tasks because the state for a
+      // completed task shouldn't be changed.
+      Task* task = NULL;
+      if (executor->launchedTasks.contains(taskId)) {
+        task = executor->launchedTasks[taskId];
+      } else if (executor->terminatedTasks.contains(taskId)) {
+        task = executor->terminatedTasks[taskId];
+      }
+
+      // We set the status update state of the task here because in
+      // steady state master updates the status update state of the
+      // task when it receives this update. If the master fails over,
+      // slave re-registers with this task with this status update
+      // state. Note that an acknowledgement for this update might be
+      // enqueued on status update manager when we are here. But that
+      // is ok because the status update state will be updated when
+      // the next update is forwarded to the slave.
+      if (task != NULL) {
+        task->set_status_update_state(update.status().state());
+        task->set_status_update_uuid(update.uuid());
+      }
+    }
+  }
+
   CHECK_SOME(master);
   LOG(INFO) << "Forwarding the update " << update << " to " << master.get();
 
+  // NOTE: We forward the update even if framework/executor/task
+  // doesn't exist because the status update manager will be expecting
+  // an acknowledgement for the update. This could happen for example
+  // if this is a retried terminal update and before we are here the
+  // slave has already processed the acknowledgement of the original
+  // update and removed the framework/executor/task. Also, slave
+  // re-registration can generate updates when framework/executor/task
+  // are unknown.
+
   // Forward the update to master.
   StatusUpdateMessage message;
   message.mutable_update()->MergeFrom(update);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index a75910d..a18a41a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -126,9 +126,6 @@ TEST_F(FaultToleranceTest, SlaveLost)
 }
 
 
-
-
-
 // The purpose of this test is to ensure that when slaves are removed
 // from the master, and then attempt to send status updates, we send
 // a ShutdownMessage to the slave. Why? Because during a network

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 759670a..a1bd00c 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1155,3 +1155,102 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+// This test verifies that when a slave re-registers with the master
+// it correctly includes the latest and status update task states.
+TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  // Create a StandaloneMasterDetector to enable the slave to trigger
+  // re-registration later.
+  StandaloneMasterDetector detector(master.get());
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Signal when the first update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  driver.start();
+
+  // Pause the clock to avoid status update retries.
+  Clock::pause();
+
+  // Wait until TASK_RUNNING is sent to the master.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Ensure status update manager handles TASK_RUNNING update.
+  AWAIT_READY(__statusUpdate);
+
+  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  // Now send TASK_FINISHED update.
+  TaskStatus finishedStatus;
+  finishedStatus = statusUpdateMessage.get().update().status();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure status update manager handles TASK_FINISHED update.
+  AWAIT_READY(__statusUpdate2);
+
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  // Drop any updates to the failed over master.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+  // Simulate a new master detected event on the slave,
+  // so that the slave will do a re-registration.
+  detector.appoint(master.get());
+
+  // Capture and inspect the slave reregistration message.
+  AWAIT_READY(reregisterSlaveMessage);
+
+  ASSERT_EQ(1, reregisterSlaveMessage.get().tasks_size());
+
+  // The latest state of the task should be TASK_FINISHED.
+  ASSERT_EQ(TASK_FINISHED, reregisterSlaveMessage.get().tasks(0).state());
+
+  // The status update state of the task should be TASK_RUNNING.
+  ASSERT_EQ(TASK_RUNNING,
+            reregisterSlaveMessage.get().tasks(0).status_update_state());
+
+  // The status update uuid should match the TASK_RUNNING's uuid.
+  ASSERT_EQ(statusUpdateMessage.get().update().uuid(),
+            reregisterSlaveMessage.get().tasks(0).status_update_uuid());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}

Reply via email to