Repository: mesos Updated Branches: refs/heads/master 30fdabe1a -> 4700fadf0
Expose pending tasks during reconciliation. Review: https://reviews.apache.org/r/24516 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4700fadf Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4700fadf Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4700fadf Branch: refs/heads/master Commit: 4700fadf0cb618489a91f48b6e57719b69ea0389 Parents: 85303d1 Author: Benjamin Mahler <[email protected]> Authored: Fri Aug 8 16:19:27 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Aug 13 11:54:23 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 35 +++++++++--- src/tests/master_authorization_tests.cpp | 78 --------------------------- src/tests/reconciliation_tests.cpp | 44 ++++++--------- 3 files changed, 44 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index f40a1cd..e948803 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3302,7 +3302,26 @@ void Master::reconcileTasks( LOG(INFO) << "Performing implicit task state reconciliation for framework " << frameworkId; - // TODO(bmahler): Consider sending completed tasks? + foreachvalue (const TaskInfo& task, framework->pendingTasks) { + const StatusUpdate& update = protobuf::createStatusUpdate( + frameworkId, + task.slave_id(), + task.task_id(), + TASK_STAGING, + "Reconciliation: Latest task state"); + + VLOG(1) << "Sending implicit reconciliation state " + << update.status().state() + << " for task " << update.status().task_id() + << " of framework " << frameworkId; + + // TODO(bmahler): Consider using forward(); might lead to too + // much logging. + StatusUpdateMessage message; + message.mutable_update()->CopyFrom(update); + send(framework->pid, message); + } + foreachvalue (Task* task, framework->tasks) { const StatusUpdate& update = protobuf::createStatusUpdate( frameworkId, @@ -3331,7 +3350,7 @@ void Master::reconcileTasks( << statuses.size() << " tasks of framework " << frameworkId; // Explicit reconciliation occurs for the following cases: - // (1) Task is known, but pending: no-op. + // (1) Task is known, but pending: TASK_STAGING. // (2) Task is known: send the latest state. // (3) Task is unknown, slave is registered: TASK_LOST. // (4) Task is unknown, slave is transitioning: no-op. @@ -3353,10 +3372,14 @@ void Master::reconcileTasks( Task* task = framework->getTask(status.task_id()); if (framework->pendingTasks.contains(status.task_id())) { - // (1) Task is known, but pending: no-op. - LOG(INFO) << "Ignoring reconciliation request of task " - << status.task_id() << " from framework " << frameworkId - << " because the task is pending"; + // (1) Task is known, but pending: TASK_STAGING. + const TaskInfo& task_ = framework->pendingTasks[status.task_id()]; + update = protobuf::createStatusUpdate( + frameworkId, + task_.slave_id(), + task_.task_id(), + TASK_STAGING, + "Reconciliation: Latest task state"); } else if (task != NULL) { // (2) Task is known: send the latest state. update = protobuf::createStatusUpdate( http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index f0f0648..b9aa7bf 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -523,84 +523,6 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved) } -// This test verifies that a reconciliation request that comes before -// '_launchTasks()' is ignored. -TEST_F(MasterAuthorizationTest, ReconcileTask) -{ - MockAuthorizer authorizer; - Try<PID<Master> > master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - - Try<PID<Slave> > slave = StartSlave(&exec); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)) - .Times(1); - - Future<vector<Offer> > offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - EXPECT_NE(0u, offers.get().size()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - vector<TaskInfo> tasks; - tasks.push_back(task); - - // Return a pending future from authorizer. - Future<Nothing> future; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>())) - .WillOnce(DoAll(FutureSatisfy(&future), - Return(promise.future()))); - - driver.launchTasks(offers.get()[0].id(), tasks); - - // Wait until authorization is in progress. - AWAIT_READY(future); - - // Scheduler shouldn't get an update from reconciliation. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - - Future<ReconcileTasksMessage> reconcileTasksMessage = - FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _); - - vector<TaskStatus> statuses; - - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id()); - status.set_state(TASK_STAGING); - - statuses.push_back(status); - - driver.reconcileTasks(statuses); - - AWAIT_READY(reconcileTasksMessage); - - // Make sure the framework doesn't receive any update. - Clock::pause(); - Clock::settle(); - - // Now stop the framework. - driver.stop(); - driver.join(); - - Shutdown(); // Must shutdown before 'containerizer' gets deallocated. -} - - // This test verifies that two tasks each launched on a different // slave with same executor id but different executor info are // allowed even when the first task is pending due to authorization. http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 3c4d7ed..8c66659 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -578,8 +578,7 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask) // This test ensures that reconciliation requests for tasks that are -// pending (due to validation/authorization) do not result in status -// updates. +// pending are exposed in reconciliation. TEST_F(ReconciliationTest, PendingTask) { MockAuthorizer authorizer; @@ -615,10 +614,6 @@ TEST_F(ReconciliationTest, PendingTask) AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size()); - // Framework should not receive any update. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - // Return a pending future from authorizer. Future<Nothing> future; Promise<bool> promise; @@ -635,43 +630,34 @@ TEST_F(ReconciliationTest, PendingTask) // Wait until authorization is in progress. AWAIT_READY(future); - // First send an implicit reconciliation request for this task, - // there should be no updates. - Future<ReconcileTasksMessage> reconcileTasksMessage = - FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _); - - Clock::pause(); + // First send an implicit reconciliation request for this task. + Future<TaskStatus> update; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update)); vector<TaskStatus> statuses; driver.reconcileTasks(statuses); - // Make sure the master received the reconcile tasks message. - AWAIT_READY(reconcileTasksMessage); + AWAIT_READY(update); + EXPECT_EQ(TASK_STAGING, update.get().state()); + EXPECT_TRUE(update.get().has_slave_id()); - // The Clock::settle() will ensure that framework would receive - // a status update if it is sent by the master. In this test it - // shouldn't receive any. - Clock::settle(); + // Now send an explicit reconciliation request for this task. + Future<TaskStatus> update2; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update2)); - // Now send an explicit reconciliation request for this task; - // there should be no updates. TaskStatus status; status.mutable_task_id()->CopyFrom(task.task_id()); status.mutable_slave_id()->CopyFrom(slaveId); status.set_state(TASK_STAGING); statuses.push_back(status); - reconcileTasksMessage = FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _); - driver.reconcileTasks(statuses); - // Make sure the master received the reconcile tasks message. - AWAIT_READY(reconcileTasksMessage); - - // The Clock::settle() will ensure that framework would receive - // a status update if it is sent by the master. In this test it - // shouldn't receive any. - Clock::settle(); + AWAIT_READY(update2); + EXPECT_EQ(TASK_STAGING, update2.get().state()); + EXPECT_TRUE(update2.get().has_slave_id()); driver.stop(); driver.join();
