Repository: mesos
Updated Branches:
  refs/heads/master 30fdabe1a -> 4700fadf0


Expose pending tasks during reconciliation.

Review: https://reviews.apache.org/r/24516


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4700fadf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4700fadf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4700fadf

Branch: refs/heads/master
Commit: 4700fadf0cb618489a91f48b6e57719b69ea0389
Parents: 85303d1
Author: Benjamin Mahler <[email protected]>
Authored: Fri Aug 8 16:19:27 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp                    | 35 +++++++++---
 src/tests/master_authorization_tests.cpp | 78 ---------------------------
 src/tests/reconciliation_tests.cpp       | 44 ++++++---------
 3 files changed, 44 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f40a1cd..e948803 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3302,7 +3302,26 @@ void Master::reconcileTasks(
     LOG(INFO) << "Performing implicit task state reconciliation for framework "
               << frameworkId;
 
-    // TODO(bmahler): Consider sending completed tasks?
+    foreachvalue (const TaskInfo& task, framework->pendingTasks) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          frameworkId,
+          task.slave_id(),
+          task.task_id(),
+          TASK_STAGING,
+          "Reconciliation: Latest task state");
+
+      VLOG(1) << "Sending implicit reconciliation state "
+              << update.status().state()
+              << " for task " << update.status().task_id()
+              << " of framework " << frameworkId;
+
+      // TODO(bmahler): Consider using forward(); might lead to too
+      // much logging.
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
     foreachvalue (Task* task, framework->tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
@@ -3331,7 +3350,7 @@ void Master::reconcileTasks(
             << statuses.size() << " tasks of framework " << frameworkId;
 
   // Explicit reconciliation occurs for the following cases:
-  //   (1) Task is known, but pending: no-op.
+  //   (1) Task is known, but pending: TASK_STAGING.
   //   (2) Task is known: send the latest state.
   //   (3) Task is unknown, slave is registered: TASK_LOST.
   //   (4) Task is unknown, slave is transitioning: no-op.
@@ -3353,10 +3372,14 @@ void Master::reconcileTasks(
     Task* task = framework->getTask(status.task_id());
 
     if (framework->pendingTasks.contains(status.task_id())) {
-      // (1) Task is known, but pending: no-op.
-      LOG(INFO) << "Ignoring reconciliation request of task "
-                << status.task_id() << " from framework " << frameworkId
-                << " because the task is pending";
+      // (1) Task is known, but pending: TASK_STAGING.
+      const TaskInfo& task_ = framework->pendingTasks[status.task_id()];
+      update = protobuf::createStatusUpdate(
+          frameworkId,
+          task_.slave_id(),
+          task_.task_id(),
+          TASK_STAGING,
+          "Reconciliation: Latest task state");
     } else if (task != NULL) {
       // (2) Task is known: send the latest state.
       update = protobuf::createStatusUpdate(

http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index f0f0648..b9aa7bf 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -523,84 +523,6 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
 }
 
 
-// This test verifies that a reconciliation request that comes before
-// '_launchTasks()' is ignored.
-TEST_F(MasterAuthorizationTest, ReconcileTask)
-{
-  MockAuthorizer authorizer;
-  Try<PID<Master> > master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
-  Try<PID<Slave> > slave = StartSlave(&exec);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> future;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(future);
-
-  // Scheduler shouldn't get an update from reconciliation.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _);
-
-  vector<TaskStatus> statuses;
-
-  TaskStatus status;
-  status.mutable_task_id()->CopyFrom(task.task_id());
-  status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
-  status.set_state(TASK_STAGING);
-
-  statuses.push_back(status);
-
-  driver.reconcileTasks(statuses);
-
-  AWAIT_READY(reconcileTasksMessage);
-
-  // Make sure the framework doesn't receive any update.
-  Clock::pause();
-  Clock::settle();
-
-  // Now stop the framework.
-  driver.stop();
-  driver.join();
-
-  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
-}
-
-
 // This test verifies that two tasks each launched on a different
 // slave with same executor id but different executor info are
 // allowed even when the first task is pending due to authorization.

http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp 
b/src/tests/reconciliation_tests.cpp
index 3c4d7ed..8c66659 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -578,8 +578,7 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
 
 
 // This test ensures that reconciliation requests for tasks that are
-// pending (due to validation/authorization) do not result in status
-// updates.
+// pending are exposed in reconciliation.
 TEST_F(ReconciliationTest, PendingTask)
 {
   MockAuthorizer authorizer;
@@ -615,10 +614,6 @@ TEST_F(ReconciliationTest, PendingTask)
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
-  // Framework should not receive any update.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
   // Return a pending future from authorizer.
   Future<Nothing> future;
   Promise<bool> promise;
@@ -635,43 +630,34 @@ TEST_F(ReconciliationTest, PendingTask)
   // Wait until authorization is in progress.
   AWAIT_READY(future);
 
-  // First send an implicit reconciliation request for this task,
-  // there should be no updates.
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
-  Clock::pause();
+  // First send an implicit reconciliation request for this task.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
 
   vector<TaskStatus> statuses;
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_STAGING, update.get().state());
+  EXPECT_TRUE(update.get().has_slave_id());
 
-  // The Clock::settle() will ensure that framework would receive
-  // a status update if it is sent by the master. In this test it
-  // shouldn't receive any.
-  Clock::settle();
+  // Now send an explicit reconciliation request for this task.
+  Future<TaskStatus> update2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update2));
 
-  // Now send an explicit reconciliation request for this task;
-  // there should be no updates.
   TaskStatus status;
   status.mutable_task_id()->CopyFrom(task.task_id());
   status.mutable_slave_id()->CopyFrom(slaveId);
   status.set_state(TASK_STAGING);
   statuses.push_back(status);
 
-  reconcileTasksMessage = FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
-
-  // The Clock::settle() will ensure that framework would receive
-  // a status update if it is sent by the master. In this test it
-  // shouldn't receive any.
-  Clock::settle();
+  AWAIT_READY(update2);
+  EXPECT_EQ(TASK_STAGING, update2.get().state());
+  EXPECT_TRUE(update2.get().has_slave_id());
 
   driver.stop();
   driver.join();

Reply via email to