http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/hook_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/hook_tests.cpp b/src/tests/hook_tests.cpp index c4fadbb..5428782 100644 --- a/src/tests/hook_tests.cpp +++ b/src/tests/hook_tests.cpp @@ -707,9 +707,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( Invoke(&containerizer, &MockDockerContainerizer::_launch))); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)) .WillRepeatedly(DoDefault()); @@ -717,6 +719,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( driver.launchTasks(offers.get()[0].id(), {task}); AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY_FOR(statusStarting, Seconds(60)); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); AWAIT_READY_FOR(statusFinished, Seconds(60)); @@ -924,9 +928,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( Invoke(&containerizer, &MockDockerContainerizer::_launch))); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)) .WillRepeatedly(DoDefault()); @@ -934,6 +940,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( driver.launchTasks(offers.get()[0].id(), tasks); AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY_FOR(statusStarting, Seconds(60)); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); AWAIT_READY_FOR(statusFinished, Seconds(60)); @@ -1039,14 +1047,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HookTest, ROOT_DOCKER_VerifySlavePostFetchHook) ContainerInfo::DockerInfo* dockerInfo = containerInfo->mutable_docker(); dockerInfo->set_image("alpine"); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)); driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY_FOR(statusStarting, Seconds(60)); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 5d96457..c6906a7 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2786,22 +2786,34 @@ TEST_F(MasterTest, UnreachableTaskAfterFailover) TaskInfo task = createTask(offers.get()[0], "sleep 100"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&runningStatus)); + .WillOnce(FutureArg<1>(&startingStatus)) + .WillOnce(FutureArg<1>(&runningStatus)) + .WillRepeatedly(Return()); + + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + const SlaveID slaveId = startingStatus->slave_id(); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); - const SlaveID slaveId = runningStatus->slave_id(); - - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Step 4: Simulate master failover. We leave the slave without a // master so it does not attempt to re-register. @@ -6828,20 +6840,31 @@ TEST_F(MasterTest, FailoverAgentReregisterFirst) TaskInfo task = createTask(offers.get()[0], "sleep 100"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Simulate master failover. We leave the scheduler without a master // so it does not attempt to re-register yet. @@ -7048,22 +7071,33 @@ TEST_F(MasterTest, AgentRestartNoReregister) TaskInfo task = createTask(offer, "sleep 100"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( - slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); const SlaveID slaveId = runningStatus->slave_id(); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); Clock::pause(); @@ -7416,12 +7450,18 @@ TEST_F(MasterTest, TaskWithTinyResources) Resources::parse("cpus:0.00001;mem:1").get(), SLEEP_COMMAND(1000)); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); @@ -7506,12 +7546,18 @@ TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole) TaskInfo task = createTask(offer.slave_id(), resources, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); driver1.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); @@ -7808,12 +7854,18 @@ TEST_F(MasterTest, AgentDomainDifferentRegion) // Check that we can launch a task in a remote region. TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); @@ -8409,9 +8461,11 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup) v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo); - Future<v1::scheduler::Event::Update> update; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update)); + .WillOnce(FutureArg<1>(&startingUpdate)) + .WillOnce(FutureArg<1>(&runningUpdate)); { Call call; @@ -8433,11 +8487,17 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup) mesos.send(call); } - AWAIT_READY(update); + AWAIT_READY(startingUpdate); + + EXPECT_EQ(TASK_STARTING, startingUpdate->status().state()); + EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id()); + EXPECT_TRUE(startingUpdate->status().has_timestamp()); + + AWAIT_READY(runningUpdate); - EXPECT_EQ(TASK_RUNNING, update->status().state()); - EXPECT_EQ(taskInfo.task_id(), update->status().task_id()); - EXPECT_TRUE(update->status().has_timestamp()); + EXPECT_EQ(TASK_STARTING, runningUpdate->status().state()); + EXPECT_EQ(taskInfo.task_id(), runningUpdate->status().task_id()); + EXPECT_TRUE(runningUpdate->status().has_timestamp()); // Ensure that the task sandbox symbolic link is created. EXPECT_TRUE(os::exists(path::join( http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index f00dd9b..7da1be5 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -1889,8 +1889,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID) Offer offer1 = offers1.get()[0]; TaskInfo task1 = createTask(offer1, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( @@ -1898,6 +1900,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID) driver.launchTasks(offer1.id(), {task1}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task1.task_id(), startingStatus->task_id()); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task1.task_id(), runningStatus->task_id()); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index cd98b8f..d262bbe 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator) AWAIT_READY(status); EXPECT_EQ(task.task_id(), status->task_id()); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); // Advance the clock for the slave to trigger the calculation of the // total oversubscribed resources. As we described above, we don't @@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill) TaskInfo task = createTask(offers.get()[0], "sleep 10"); + Future<TaskStatus> status0; Future<TaskStatus> status1; Future<TaskStatus> status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)) .WillRepeatedly(Return()); // Ignore subsequent updates. driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(status0); + ASSERT_EQ(TASK_STARTING, status0->state()); + AWAIT_READY(status1); ASSERT_EQ(TASK_RUNNING, status1->state()); @@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware) TaskInfo task = createTask(offers.get()[0], "sleep 10"); + Future<TaskStatus> status0; Future<TaskStatus> status1; Future<TaskStatus> status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)) .WillRepeatedly(Return()); // Ignore subsequent updates. driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(status0); + ASSERT_EQ(TASK_STARTING, status0->state()); + AWAIT_READY(status1); ASSERT_EQ(TASK_RUNNING, status1->state()); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index 0597bd2..7b11264 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware) TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); - const SlaveID& slaveId = runningStatus->slave_id(); + AWAIT_READY(statusUpdateAck2); - AWAIT_READY(statusUpdateAck); + const SlaveID& slaveId = startingStatus->slave_id(); // Now, induce a partition of the slave by having the master // timeout the slave. @@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware) TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( @@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( // Launch `task1` using `sched1`. TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60"); + Future<TaskStatus> startingStatus1; Future<TaskStatus> runningStatus1; EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&startingStatus1)) .WillOnce(FutureArg<1>(&runningStatus1)); Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( @@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( // Launch the second task. TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60"); + Future<TaskStatus> startingStatus2; Future<TaskStatus> runningStatus2; EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&startingStatus2)) .WillOnce(FutureArg<1>(&runningStatus2)); Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( @@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask) // Launch `task` using `sched`. TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); const SlaveID& slaveId = runningStatus->slave_id(); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Now, induce a partition of the slave by having the master // timeout the slave. @@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework) // Launch `task` using `sched1`. TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + driver1.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); const SlaveID& slaveId = runningStatus->slave_id(); - AWAIT_READY(statusUpdateAck1); + AWAIT_READY(statusUpdateAck2); // Shutdown the master. master->reset(); @@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration) // Launch `task` using `sched`. TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); const SlaveID& slaveId = runningStatus->slave_id(); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Simulate a master loss event at the slave and then cause the // slave to reregister with the master. From the master's http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_endpoints_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp index 444737a..883192d 100644 --- a/src/tests/persistent_volume_endpoints_tests.cpp +++ b/src/tests/persistent_volume_endpoints_tests.cpp @@ -1988,16 +1988,15 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources) TaskInfo taskInfo = createTask(offer.slave_id(), taskResources, "sleep 1000"); - // Expect a TASK_RUNNING status. - EXPECT_CALL(sched, statusUpdate(&driver, _)); - - Future<Nothing> _statusUpdateAcknowledgement = - FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + Future<TaskStatus> starting; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&starting)) + .WillRepeatedly(Return()); // Ignore subsequent updates. driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})}); - // Wait for TASK_RUNNING update ack. - AWAIT_READY(_statusUpdateAcknowledgement); + AWAIT_READY(starting); + EXPECT_EQ(TASK_STARTING, starting->state()); // Summon an offer. EXPECT_CALL(sched, resourceOffers(&driver, _)) http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp index 11fe432..acfeac1 100644 --- a/src/tests/persistent_volume_tests.cpp +++ b/src/tests/persistent_volume_tests.cpp @@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume) taskResources, "echo abc > path1/file"); + Future<TaskStatus> status0; Future<TaskStatus> status1; Future<TaskStatus> status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)); @@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume) {CREATE(volume), LAUNCH({task})}); + AWAIT_READY(status0); + EXPECT_EQ(task.task_id(), status0->task_id()); + EXPECT_EQ(TASK_STARTING, status0->state()); + AWAIT_READY(status1); EXPECT_EQ(task.task_id(), status1->task_id()); EXPECT_EQ(TASK_RUNNING, status1->state()); @@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks) taskResources2.get() + volume, "echo task2 > path1/file2"); - // We should receive a TASK_RUNNING followed by a TASK_FINISHED for - // each of the 2 tasks. We do not check for the actual task state - // since it's not the primary objective of the test. We instead - // verify that the paths are created by the tasks after we receive - // enough status updates. + // We should receive a TASK_STARTING, followed by a TASK_RUNNING + // and a TASK_FINISHED for each of the 2 tasks. + // We do not check for the actual task state since it's not the + // primary objective of the test. We instead verify that the paths + // are created by the tasks after we receive enough status updates. Future<TaskStatus> status1; Future<TaskStatus> status2; Future<TaskStatus> status3; Future<TaskStatus> status4; + Future<TaskStatus> status5; + Future<TaskStatus> status6; EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)) .WillOnce(FutureArg<1>(&status3)) - .WillOnce(FutureArg<1>(&status4)); + .WillOnce(FutureArg<1>(&status4)) + .WillOnce(FutureArg<1>(&status5)) + .WillOnce(FutureArg<1>(&status6)); driver.acceptOffers( {offer.id()}, @@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks) AWAIT_READY(status2); AWAIT_READY(status3); AWAIT_READY(status4); + AWAIT_READY(status5); + AWAIT_READY(status6); const string& volumePath = slave::paths::getPersistentVolumePath( slaveFlags.work_dir, @@ -1258,10 +1270,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) Resources::parse("cpus:1;mem:128").get() + volume, "echo abc > path1/file1 && sleep 1000"); - // We should receive a TASK_RUNNING for the launched task. + // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task. + Future<TaskStatus> status0; Future<TaskStatus> status1; EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)); // We use a filter of 0 seconds so the resources will be available @@ -1275,6 +1289,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) LAUNCH({task1})}, filters); + AWAIT_READY(status0); + EXPECT_EQ(TASK_STARTING, status0->state()); + AWAIT_READY(status1); EXPECT_EQ(TASK_RUNNING, status1->state()); @@ -1331,11 +1348,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) Resources::parse("cpus:1;mem:256").get() + volume, "echo abc > path1/file2 && sleep 1000"); - // We should receive a TASK_RUNNING for the launched task. + // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task. Future<TaskStatus> status2; + Future<TaskStatus> status3; EXPECT_CALL(sched2, statusUpdate(&driver2, _)) - .WillOnce(FutureArg<1>(&status2)); + .WillOnce(FutureArg<1>(&status2)) + .WillOnce(FutureArg<1>(&status3)); driver2.acceptOffers( {offer2.id()}, @@ -1343,7 +1362,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) filters); AWAIT_READY(status2); - EXPECT_EQ(TASK_RUNNING, status2->state()); + EXPECT_EQ(TASK_STARTING, status2->state()); + + AWAIT_READY(status3); + EXPECT_EQ(TASK_RUNNING, status3->state()); // Collect metrics based on both frameworks. Note that the `cpus_used` and // `mem_used` is updated, but `disk_used` does not change since both tasks @@ -1434,13 +1456,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover) taskResources.get() + volume, "sleep 1000"); - // We should receive a TASK_RUNNING for each of the tasks. + // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks. Future<TaskStatus> status1; Future<TaskStatus> status2; + Future<TaskStatus> status3; + Future<TaskStatus> status4; EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status1)) - .WillOnce(FutureArg<1>(&status2)); + .WillOnce(FutureArg<1>(&status2)) + .WillOnce(FutureArg<1>(&status3)) + .WillOnce(FutureArg<1>(&status4)); Future<CheckpointResourcesMessage> checkpointResources = FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid); @@ -1451,11 +1477,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover) LAUNCH({task1, task2})}); AWAIT_READY(checkpointResources); + + // We only check the first and the last status, because the two in between + // could arrive in any order. AWAIT_READY(status1); - EXPECT_EQ(TASK_RUNNING, status1->state()); + EXPECT_EQ(TASK_STARTING, status1->state()); - AWAIT_READY(status2); - EXPECT_EQ(TASK_RUNNING, status2->state()); + AWAIT_READY(status4); + EXPECT_EQ(TASK_RUNNING, status4->state()); // This is to make sure CheckpointResourcesMessage is processed. Clock::pause(); @@ -1598,16 +1627,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks) EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)); - // We should receive a TASK_RUNNING each of the 2 tasks. We track task - // termination by a TASK_FINISHED for the short-lived task. + // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks. + // We track task termination by a TASK_FINISHED for the short-lived task. Future<TaskStatus> status1; Future<TaskStatus> status2; Future<TaskStatus> status3; + Future<TaskStatus> status4; + Future<TaskStatus> status5; EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)) - .WillOnce(FutureArg<1>(&status3)); + .WillOnce(FutureArg<1>(&status3)) + .WillOnce(FutureArg<1>(&status4)) + .WillOnce(FutureArg<1>(&status5)); driver.acceptOffers( {offer.id()}, @@ -1616,18 +1649,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks) LAUNCH({task1, task2})}, filters); - // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for - // the short-lived task. + // Wait for TASK_STARTING and TASK_RUNNING for both the tasks, + // and TASK_FINISHED for the short-lived task. AWAIT_READY(status1); AWAIT_READY(status2); AWAIT_READY(status3); + AWAIT_READY(status4); + AWAIT_READY(status5); hashset<TaskID> tasksRunning; hashset<TaskID> tasksFinished; - vector<Future<TaskStatus>> statuses{status1, status2, status3}; + vector<Future<TaskStatus>> statuses { + status1, status2, status3, status4, status5}; foreach (const Future<TaskStatus>& status, statuses) { - if (status->state() == TASK_RUNNING) { + if (status->state() == TASK_STARTING) { + // ignore + } else if (status->state() == TASK_RUNNING) { tasksRunning.insert(status->task_id()); } else { tasksFinished.insert(status->task_id()); @@ -1686,15 +1724,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks) // We kill the long-lived task and wait for TASK_KILLED, so we can // DESTROY the persistent volume once the task terminates. - Future<TaskStatus> status4; + Future<TaskStatus> status6; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status4)); + .WillOnce(FutureArg<1>(&status6)); driver.killTask(task1.task_id()); - AWAIT_READY(status4); - EXPECT_EQ(task1.task_id(), status4->task_id()); - EXPECT_EQ(TASK_KILLED, status4->state()); + AWAIT_READY(status6); + EXPECT_EQ(task1.task_id(), status6->task_id()); + EXPECT_EQ(TASK_KILLED, status6->state()); EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)); @@ -1923,25 +1961,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery) taskResources, "while true; do test -d path1; done"); + Future<TaskStatus> status0; Future<TaskStatus> status1; Future<TaskStatus> status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)); - Future<Nothing> ack = + Future<Nothing> ack1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> ack2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.acceptOffers( {offer.id()}, {CREATE(volume), LAUNCH({task})}); + AWAIT_READY(status0); + EXPECT_EQ(task.task_id(), status0->task_id()); + EXPECT_EQ(TASK_STARTING, status0->state()); + + // Wait for the ACK to be checkpointed. + AWAIT_READY(ack1); + AWAIT_READY(status1); EXPECT_EQ(task.task_id(), status1->task_id()); EXPECT_EQ(TASK_RUNNING, status1->state()); // Wait for the ACK to be checkpointed. - AWAIT_READY(ack); + AWAIT_READY(ack2); // Restart the slave. slave.get()->terminate(); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 64a1d3d..8ae2860 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover) // Launch `task` using `sched`. TaskInfo task = createTask(offer, "sleep 60"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); const SlaveID slaveId = runningStatus->slave_id(); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Now, induce a partition of the slave by having the master // timeout the slave. http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reservation_endpoints_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp index e70dd0d..3645cd8 100644 --- a/src/tests/reservation_endpoints_tests.cpp +++ b/src/tests/reservation_endpoints_tests.cpp @@ -378,6 +378,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources) MesosSchedulerDriver driver( &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + // Expect one TASK_STARTING and one TASK_RUNNING update EXPECT_CALL(sched, registered(&driver, _, _)); Future<vector<Offer>> offers; @@ -407,16 +408,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources) // recovers 'offered' resources portion. TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000"); - // Expect a TASK_RUNNING status. - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expect a TASK_STARTING and a TASK_RUNNING status. + EXPECT_CALL(sched, statusUpdate(_, _)). + WillRepeatedly(Return()); - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})}); - // Wait for TASK_RUNNING update ack. - AWAIT_READY(_statusUpdateAcknowledgement); + // Wait for update acks. + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); // Summon an offer to receive the 'offered' resources. EXPECT_CALL(sched, resourceOffers(&driver, _)) @@ -550,16 +556,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources) // recovers 'offered' resources portion. TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000"); - // Expect a TASK_RUNNING status. - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expect a TASK_STARTING and a TASK_RUNNING status. + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillRepeatedly(Return()); - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})}); - // Wait for TASK_RUNNING update ack. - AWAIT_READY(_statusUpdateAcknowledgement); + // Wait for update acks from TASK_STARTING and TASK_RUNNING. + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); // Summon an offer to receive the 'offered' resources. EXPECT_CALL(sched, resourceOffers(&driver, _)) @@ -1575,9 +1586,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources) Offer offer = offers.get()[0]; - Future<TaskStatus> status; + Future<TaskStatus> statusStarting; + Future<TaskStatus> statusRunning; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureArg<1>(&status)); + .WillOnce(FutureArg<1>(&statusStarting)) + .WillOnce(FutureArg<1>(&statusRunning)); Resources taskResources = Resources::parse( "cpus(role):2;mem(role):512;cpus:2;mem:1024").get(); @@ -1586,8 +1599,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources) driver.acceptOffers({offer.id()}, {LAUNCH({task})}); - AWAIT_READY(status); - ASSERT_EQ(TASK_RUNNING, status->state()); + AWAIT_READY(statusStarting); + ASSERT_EQ(TASK_STARTING, statusStarting->state()); + + AWAIT_READY(statusRunning); + ASSERT_EQ(TASK_RUNNING, statusRunning->state()); Future<Response> response = process::http::get( agent.get()->pid, http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/role_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp index 568ea90..084555a 100644 --- a/src/tests/role_tests.cpp +++ b/src/tests/role_tests.cpp @@ -979,9 +979,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies) taskResources, "! (ls -Av path | grep -q .)"); - // We expect two status updates for the task. - Future<TaskStatus> status1, status2; + // We expect three status updates for the task. + Future<TaskStatus> status0, status1, status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)); @@ -990,6 +991,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies) {offer.id()}, {RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})}); + AWAIT_READY(status0); + + EXPECT_EQ(task.task_id(), status0->task_id()); + EXPECT_EQ(TASK_STARTING, status0->state()); + AWAIT_READY(status1); EXPECT_EQ(task.task_id(), status1->task_id()); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 4eda96e..6df4d32 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning) taskGroup.add_tasks()->CopyFrom(task1); taskGroup.add_tasks()->CopyFrom(task2); + Future<Event::Update> startingUpdate1; + Future<Event::Update> startingUpdate2; Future<Event::Update> runningUpdate1; Future<Event::Update> runningUpdate2; Future<Event::Update> finishedUpdate1; Future<Event::Update> finishedUpdate2; EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&startingUpdate1)) + .WillOnce(FutureArg<1>(&startingUpdate2)) .WillOnce(FutureArg<1>(&runningUpdate1)) .WillOnce(FutureArg<1>(&runningUpdate2)) .WillOnce(FutureArg<1>(&finishedUpdate1)) @@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning) EXPECT_EQ(devolve(task2.task_id()), runTaskGroupMessage->task_group().tasks(1).task_id()); + AWAIT_READY(startingUpdate1); + ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state()); + + AWAIT_READY(startingUpdate2); + ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state()); + + const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()}; + + // TASK_STARTING updates for the tasks in a + // task group can be received in any order. + const hashset<v1::TaskID> tasksStarting{ + startingUpdate1->status().task_id(), + startingUpdate2->status().task_id()}; + + ASSERT_EQ(tasks, tasksStarting); + + // Acknowledge the TASK_STARTING updates so + // that subsequent updates can be received. + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom( + startingUpdate1->status().task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id()); + acknowledge->set_uuid(startingUpdate1->status().uuid()); + + mesos.send(call); + } + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom( + startingUpdate2->status().task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id()); + acknowledge->set_uuid(startingUpdate2->status().uuid()); + + mesos.send(call); + } + AWAIT_READY(runningUpdate1); ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state()); AWAIT_READY(runningUpdate2); ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state()); - const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()}; - // TASK_RUNNING updates for the tasks in a // task group can be received in any order. const hashset<v1::TaskID> tasksRunning{ http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp index 868e39e..2dcfd6c 100644 --- a/src/tests/slave_authorization_tests.cpp +++ b/src/tests/slave_authorization_tests.cpp @@ -627,10 +627,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent) // The first task should fail since the task user `foo` is not an // authorized user that can launch a task. However, the second task // should succeed. + Future<TaskStatus> status0; Future<TaskStatus> status1; Future<TaskStatus> status2; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status0)) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)); @@ -638,16 +640,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent) {offer.id()}, {LAUNCH({task1, task2})}); - // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task. + // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by + // TASK_RUNNING for 2nd task. + AWAIT_READY(status0); AWAIT_READY(status1); AWAIT_READY(status2); // Validate both the statuses. Note that the order of receiving the - // status updates for the 2 tasks is not deterministic. - hashmap<TaskID, TaskStatus> statuses { - {status1->task_id(), status1.get()}, - {status2->task_id(), status2.get()} - }; + // status updates for the 2 tasks is not deterministic, but we know + // that task2's TASK_RUNNING ARRIVES after TASK_STARTING. + hashmap<TaskID, TaskStatus> statuses; + statuses[status0->task_id()] = status0.get(); + statuses[status1->task_id()] = status1.get(); + statuses[status2->task_id()] = status2.get(); ASSERT_TRUE(statuses.contains(task1.task_id())); EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state()); @@ -741,7 +746,7 @@ TEST_F(ExecutorAuthorizationTest, RunTaskGroup) AWAIT_READY(status); ASSERT_EQ(task.task_id(), status->task_id()); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 30d8c23..c2d9cc8 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) // Capture the update. AWAIT_READY(update); - EXPECT_EQ(TASK_RUNNING, update->update().status().state()); + EXPECT_EQ(TASK_STARTING, update->update().status().state()); // Wait for the ACK to be checkpointed. AWAIT_READY(_statusUpdateAcknowledgement); @@ -314,7 +314,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) .info); // Check status update and ack. - ASSERT_EQ( + // (the number might be bigger than 1 because we might have + // received any number of additional TASK_RUNNING updates) + ASSERT_LE( 1U, state .frameworks[frameworkId] @@ -423,7 +425,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager) ASSERT_SOME(slave); AWAIT_READY(status); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); driver.stop(); driver.join(); @@ -514,7 +516,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor) // Scheduler should receive the recovered update. AWAIT_READY(status); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); driver.stop(); driver.join(); @@ -762,13 +764,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor) TaskInfo task = createTask(offers.get()[0], "sleep 1000"); // Drop the first update from the executor. - Future<StatusUpdateMessage> statusUpdate = + Future<StatusUpdateMessage> startingUpdate = DROP_PROTOBUF(StatusUpdateMessage(), _, _); driver.launchTasks(offers.get()[0].id(), {task}); // Stop the slave before the status update is received. - AWAIT_READY(statusUpdate); + AWAIT_READY(startingUpdate); slave.get()->terminate(); @@ -791,15 +793,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor) // Ensure the executor re-registers. AWAIT_READY(reregister); - // Executor should inform about the unacknowledged update. - ASSERT_EQ(1, reregister->updates_size()); + // Executor should inform about the unacknowledged updates. + ASSERT_LE(1, reregister->updates_size()); const StatusUpdate& update = reregister->updates(0); EXPECT_EQ(task.task_id(), update.status().task_id()); - EXPECT_EQ(TASK_RUNNING, update.status().state()); + EXPECT_EQ(TASK_STARTING, update.status().state()); // Scheduler should receive the recovered update. AWAIT_READY(status); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); driver.stop(); driver.join(); @@ -846,7 +848,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry) Future<TaskStatus> statusUpdate; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&statusUpdate)); + .WillOnce(FutureArg<1>(&statusUpdate)) + .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates. driver.start(); @@ -863,7 +866,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry) driver.launchTasks(offers.get()[0].id(), {task}); AWAIT_READY(statusUpdate); - EXPECT_EQ(TASK_RUNNING, statusUpdate->state()); + EXPECT_EQ(TASK_STARTING, statusUpdate->state()); // Ensure the acknowledgement is checkpointed. Clock::settle(); @@ -974,12 +977,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery) TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + Future<TaskStatus> statusUpdate0; Future<TaskStatus> statusUpdate1; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate0)) .WillOnce(FutureArg<1>(&statusUpdate1)); driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(statusUpdate0); + ASSERT_EQ(TASK_STARTING, statusUpdate0->state()); + + driver.acknowledgeStatusUpdate(statusUpdate0.get()); + AWAIT_READY(statusUpdate1); ASSERT_EQ(TASK_RUNNING, statusUpdate1->state()); @@ -1442,7 +1452,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor) Future<vector<Offer>> offers1; EXPECT_CALL(sched, resourceOffers(_, _)) - .WillOnce(FutureArg<1>(&offers1)); + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. driver.start(); @@ -1451,7 +1462,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor) TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillRepeatedly(Return()); // Allow any number of subsequent status updates. Future<Nothing> ack = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); @@ -1583,7 +1595,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) Future<vector<Offer>> offers1; EXPECT_CALL(sched, resourceOffers(_, _)) - .WillOnce(FutureArg<1>(&offers1)); + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. driver.start(); @@ -1595,25 +1608,42 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) Future<Message> registerExecutor = FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); - EXPECT_CALL(sched, statusUpdate(_, _)); - Future<Nothing> ack = + Future<TaskStatus> statusStarting; + Future<TaskStatus> statusRunning; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&statusStarting)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillRepeatedly(Return()); // Ignore subsequent status updates. + + Future<Nothing> ackRunning = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + Future<Nothing> ackStarting = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task}); + // Wait for the TASK_STARTING update from the executor + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + + // Wait for the TASK_RUNNING update from the executor + AWAIT_READY(statusRunning); + EXPECT_EQ(TASK_RUNNING, statusRunning->state()); + // Capture the executor pid. AWAIT_READY(registerExecutor); UPID executorPid = registerExecutor->from; // Wait for the ACK to be checkpointed. - AWAIT_READY(ack); + AWAIT_READY(ackStarting); + AWAIT_READY(ackRunning); slave.get()->terminate(); - Future<TaskStatus> status; + Future<TaskStatus> statusLost; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureArg<1>(&status)) + .WillOnce(FutureArg<1>(&statusLost)) .WillRepeatedly(Return()); // Ignore subsequent status updates. // Now shut down the executor, when the slave is down. @@ -1644,18 +1674,18 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) Clock::advance(flags.executor_reregistration_timeout); // Now advance time until the reaper reaps the executor. - while (status.isPending()) { + while (statusLost.isPending()) { Clock::advance(process::MAX_REAP_INTERVAL()); Clock::settle(); } // Scheduler should receive the TASK_LOST update. - AWAIT_READY(status); + AWAIT_READY(statusLost); - EXPECT_EQ(TASK_LOST, status->state()); - EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TASK_LOST, statusLost->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusLost->source()); EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT, - status->reason()); + statusLost->reason()); while (offers2.isPending()) { Clock::advance(Seconds(1)); @@ -1816,7 +1846,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor) TaskInfo task = createTask(offers1.get()[0], "exit 0"); EXPECT_CALL(sched, statusUpdate(_, _)) - .Times(2); // TASK_RUNNING and TASK_FINISHED updates. + .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates. EXPECT_CALL(sched, offerRescinded(_, _)) .Times(AtMost(1)); @@ -2010,15 +2040,22 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) TaskInfo task = createTask(offers.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expect TASK_STARTING and TASK_RUNNING updates + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2) + .WillRepeatedly(Return()); // Ignore subsequent updates - Future<Nothing> ack = + Future<Nothing> ackRunning = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> ackStarting = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers.get()[0].id(), {task}); // Wait for the ACK to be checkpointed. - AWAIT_READY(ack); + AWAIT_READY(ackStarting); + AWAIT_READY(ackRunning); slave.get()->terminate(); @@ -2129,17 +2166,23 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework) Resources(offer1.resources()) + Resources(offer2.resources()))); + Future<Nothing> update0; Future<Nothing> update1; Future<Nothing> update2; + Future<Nothing> update3; EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureSatisfy(&update0)) .WillOnce(FutureSatisfy(&update1)) - .WillOnce(FutureSatisfy(&update2)); + .WillOnce(FutureSatisfy(&update2)) + .WillOnce(FutureSatisfy(&update3)); driver.launchTasks(offers.get()[0].id(), tasks); - // Wait for TASK_RUNNING updates from the tasks. + // Wait for TASK_STARTING and TASK_RUNNING updates from the tasks. + AWAIT_READY(update0); AWAIT_READY(update1); AWAIT_READY(update2); + AWAIT_READY(update3); // The master should generate TASK_LOST updates once the slave is stopped. Future<TaskStatus> status1; @@ -2439,15 +2482,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask) TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expect a TASK_STARTING and a TASK_RUNNING update + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2); - Future<Nothing> ack = + Future<Nothing> ack1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> ack2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task}); // Wait for the ACK to be checkpointed. - AWAIT_READY(ack); + AWAIT_READY(ack1); + AWAIT_READY(ack2); slave.get()->terminate(); @@ -3068,9 +3117,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave) TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); - Future<Nothing> statusUpdate1; + Future<Nothing> statusUpdate1, statusUpdate2; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureSatisfy(&statusUpdate1)) + .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING + .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING .WillOnce(Return()); // Ignore TASK_FAILED update. Future<Message> registerExecutor = @@ -3082,7 +3132,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave) AWAIT_READY(registerExecutor); UPID executorPid = registerExecutor->from; - AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update. + AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update. EXPECT_CALL(sched, offerRescinded(_, _)) .Times(AtMost(1)); @@ -3186,18 +3236,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) TaskInfo task = createTask(offers.get()[0], "sleep 1000"); - Future<TaskStatus> status; + Future<TaskStatus> statusStarting, statusRunning; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureArg<1>(&status)); + .WillOnce(FutureArg<1>(&statusStarting)) + .WillOnce(FutureArg<1>(&statusRunning)); driver.launchTasks(offers.get()[0].id(), {task}); - AWAIT_READY(status); - EXPECT_EQ(TASK_RUNNING, status->state()); + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); - Future<TaskStatus> status2; + AWAIT_READY(statusRunning); + EXPECT_EQ(TASK_RUNNING, statusRunning->state()); + + Future<TaskStatus> statusLost; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureArg<1>(&status2)); + .WillOnce(FutureArg<1>(&statusLost)); Future<Nothing> slaveLost; EXPECT_CALL(sched, slaveLost(_, _)) @@ -3223,11 +3277,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) AWAIT_READY(executorTerminated); // The master should send a TASK_LOST and slaveLost. - AWAIT_READY(status2); + AWAIT_READY(statusLost); - EXPECT_EQ(TASK_LOST, status2->state()); - EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source()); - EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason()); + EXPECT_EQ(TASK_LOST, statusLost->state()); + EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason()); AWAIT_READY(slaveLost); @@ -3415,16 +3469,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask) SlaveID slaveId = offers1.get()[0].slave_id(); FrameworkID frameworkId = offers1.get()[0].framework_id(); - // Expecting TASK_RUNNING status. - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expecting TASK_STARTING and TASK_RUNNING status. + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task}); // Wait for TASK_RUNNING update to be acknowledged. - AWAIT_READY(_statusUpdateAcknowledgement); + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); slave.get()->terminate(); @@ -3513,18 +3572,24 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework) // Capture the framework id. FrameworkID frameworkId = offers.get()[0].framework_id(); - // Expecting TASK_RUNNING status. - EXPECT_CALL(sched, statusUpdate(_, _)); + // Expecting a TASK_STARTING and a TASK_RUNNING status. + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2) + .WillRepeatedly(Return()); - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); TaskInfo task = createTask(offers.get()[0], "sleep 1000"); driver.launchTasks(offers.get()[0].id(), {task}); - // Wait for TASK_RUNNING update to be acknowledged. - AWAIT_READY(_statusUpdateAcknowledgement); + // Wait for the updates to be acknowledged. + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); slave.get()->terminate(); @@ -3661,15 +3726,30 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) // re-registers by wiping the relevant meta directory. TaskInfo task = createTask(offers1.get()[0], "sleep 10"); - EXPECT_CALL(sched, statusUpdate(_, _)); + Future<TaskStatus> starting; + Future<TaskStatus> running; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&starting)) + .WillOnce(FutureArg<1>(&running)) + .WillRepeatedly(Return()); // Ignore subsequent updates. - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> startingAck = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> runningAck = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task}); // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement); + AWAIT_READY(starting); + AWAIT_READY(startingAck); + + AWAIT_READY(running); + AWAIT_READY(runningAck); + + EXPECT_EQ(TASK_STARTING, starting->state()); + EXPECT_EQ(TASK_RUNNING, running->state()); EXPECT_CALL(allocator, deactivateSlave(_)); @@ -3824,15 +3904,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover) // Create a long running task. TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); - EXPECT_CALL(sched1, statusUpdate(_, _)); + // Expecting TASK_STARTING and TASK_RUNNING updates + EXPECT_CALL(sched1, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver1.launchTasks(offers1.get()[0].id(), {task}); // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement); + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); slave.get()->terminate(); @@ -3975,15 +4061,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover) TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2); // TASK_STARTING and TASK_RUNNING - Future<Nothing> _statusUpdateAcknowledgement = + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task}); - // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement); + // Wait for both ACKs to be checkpointed. + AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_statusUpdateAcknowledgement2); slave.get()->terminate(); @@ -4117,15 +4208,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) // Framework 1 launches a task. TaskInfo task1 = createTask(offer1, "sleep 1000"); - EXPECT_CALL(sched1, statusUpdate(_, _)); + EXPECT_CALL(sched1, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement1 = + Future<Nothing> _startingStatusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _runningStatusUpdateAcknowledgement1 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); driver1.launchTasks(offer1.id(), {task1}); // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_startingStatusUpdateAcknowledgement1); + AWAIT_READY(_runningStatusUpdateAcknowledgement1); // Framework 2. Enable checkpointing. FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; @@ -4150,14 +4246,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) // Framework 2 launches a task. TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); - EXPECT_CALL(sched2, statusUpdate(_, _)); + EXPECT_CALL(sched2, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement2 = + Future<Nothing> _startingStatusUpdateAcknowledgement2 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _runningStatusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + driver2.launchTasks(offers2.get()[0].id(), {task2}); // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement2); + AWAIT_READY(_startingStatusUpdateAcknowledgement2); + AWAIT_READY(_runningStatusUpdateAcknowledgement2); slave.get()->terminate(); @@ -4281,15 +4383,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) // Launch a long running task in the first slave. TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement1 = + Future<Nothing> _startingStatusUpdateAcknowledgement1 = + FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _runningStatusUpdateAcknowledgement1 = FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers1.get()[0].id(), {task1}); // Wait for the ACK to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement1); + AWAIT_READY(_startingStatusUpdateAcknowledgement1); + AWAIT_READY(_runningStatusUpdateAcknowledgement1); Future<vector<Offer>> offers2; EXPECT_CALL(sched, resourceOffers(&driver, _)) @@ -4316,15 +4423,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) // Launch a long running task in each slave. TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); - EXPECT_CALL(sched, statusUpdate(_, _)); + EXPECT_CALL(sched, statusUpdate(_, _)) + .Times(2); - Future<Nothing> _statusUpdateAcknowledgement2 = + Future<Nothing> _startingStatusUpdateAcknowledgement2 = + FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> _runningStatusUpdateAcknowledgement2 = FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers2.get()[0].id(), {task2}); // Wait for the ACKs to be checkpointed. - AWAIT_READY(_statusUpdateAcknowledgement2); + AWAIT_READY(_startingStatusUpdateAcknowledgement2); + AWAIT_READY(_runningStatusUpdateAcknowledgement2); slave1.get()->terminate(); slave2.get()->terminate(); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 76a157f..def64b5 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments) task.mutable_command()->MergeFrom(command); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)); driver.launchTasks(offers.get()[0].id(), {task}); - // Scheduler should first receive TASK_RUNNING followed by the - // TASK_FINISHED from the executor. + // Scheduler should first receive TASK_STARTING, followed by + // TASK_RUNNING and TASK_FINISHED from the executor. + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source()); + AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source()); @@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy) task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds( gracePeriod.ns()); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)); driver.launchTasks(offer.id(), {task}); + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); @@ -1051,16 +1062,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( task.mutable_command()->MergeFrom(command); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)); driver.launchTasks(offers.get()[0].id(), {task}); - // Scheduler should first receive TASK_RUNNING followed by the - // TASK_FINISHED from the executor. + // Scheduler should first receive TASK_STARTING followed by + // TASK_RUNNING and TASK_FINISHED from the executor. + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source()); + AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source()); @@ -2381,15 +2398,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor) Resources::parse("cpus:1;mem:32").get(), SLEEP_COMMAND(1000)); - Future<TaskStatus> status; + Future<TaskStatus> statusStarting; + Future<TaskStatus> statusRunning; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); + .WillOnce(FutureArg<1>(&statusStarting)) + .WillOnce(FutureArg<1>(&statusRunning)); driver.launchTasks(offer.id(), {task}); - AWAIT_READY(status); - EXPECT_EQ(task.task_id(), status->task_id()); - EXPECT_EQ(TASK_RUNNING, status->state()); + AWAIT_READY(statusStarting); + EXPECT_EQ(task.task_id(), statusStarting->task_id()); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + + AWAIT_READY(statusRunning); + EXPECT_EQ(task.task_id(), statusRunning->task_id()); + EXPECT_EQ(TASK_RUNNING, statusRunning->state()); // Hit the statistics endpoint and expect the response contains the // resource statistics for the running container. @@ -5207,16 +5230,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables) task.mutable_command()->MergeFrom(command); + Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; Future<TaskStatus> statusFinished; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusStarting)) .WillOnce(FutureArg<1>(&statusRunning)) .WillOnce(FutureArg<1>(&statusFinished)); driver.launchTasks(offers.get()[0].id(), {task}); - // Scheduler should first receive TASK_RUNNING followed by the - // TASK_FINISHED from the executor. + // Scheduler should first receive TASK_STARTING, followed by + // TASK_STARTING and TASK_FINISHED from the executor. + AWAIT_READY(statusStarting); + EXPECT_EQ(TASK_STARTING, statusStarting->state()); + AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning->state()); @@ -5562,7 +5590,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart) UPID executorPid = registerExecutorMessage->from; AWAIT_READY(status); - EXPECT_EQ(TASK_RUNNING, status->state()); + EXPECT_EQ(TASK_STARTING, status->state()); // Restart the slave. slave.get()->terminate(); @@ -6829,14 +6857,23 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( AWAIT_READY(offers); ASSERT_FALSE(offers->offers().empty()); - Future<v1::scheduler::Event::Update> update; - - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update)); - const v1::Offer offer = offers->offers(0); const v1::AgentID& agentId = offer.agent_id(); + Future<v1::scheduler::Event::Update> updateStarting; + Future<v1::scheduler::Event::Update> updateRunning; + + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce( + DoAll( + FutureArg<1>(&updateStarting), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&updateRunning), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); // Ignore subsequent updates. + v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); @@ -6878,28 +6915,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( mesos.send(call); } - AWAIT_READY(update); - - ASSERT_EQ(TASK_RUNNING, update->status().state()); - ASSERT_EQ(taskInfo.task_id(), update->status().task_id()); - - Future<Nothing> _statusUpdateAcknowledgement = - FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement); - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); + AWAIT_READY(updateStarting); - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - acknowledge->mutable_task_id()->CopyFrom(update->status().task_id()); - acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); - acknowledge->set_uuid(update->status().uuid()); + ASSERT_EQ(TASK_STARTING, updateStarting->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id()); - mesos.send(call); - } + AWAIT_READY(updateRunning); - AWAIT_READY(_statusUpdateAcknowledgement); + ASSERT_EQ(TASK_RUNNING, updateRunning->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); // Restart the agent. slave.get()->terminate(); @@ -8177,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag) TaskInfo task = createTask(offers->front(), "sleep 1000"); + Future<TaskStatus> statusUpdate0; Future<TaskStatus> statusUpdate1; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate0)) .WillOnce(FutureArg<1>(&statusUpdate1)); driver.launchTasks(offers->front().id(), {task}); + AWAIT_READY(statusUpdate0); + ASSERT_EQ(TASK_STARTING, statusUpdate0->state()); + + driver.acknowledgeStatusUpdate(statusUpdate0.get()); + AWAIT_READY(statusUpdate1); ASSERT_EQ(TASK_RUNNING, statusUpdate1->state()); http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/teardown_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp index 5eada4f..392cacf 100644 --- a/src/tests/teardown_tests.cpp +++ b/src/tests/teardown_tests.cpp @@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover) TaskInfo task = createTask(offers.get()[0], "sleep 100"); + Future<TaskStatus> startingStatus; Future<TaskStatus> runningStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&startingStatus)) .WillOnce(FutureArg<1>(&runningStatus)); - Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( slave.get()->pid, &Slave::_statusUpdateAcknowledgement); driver.launchTasks(offers.get()[0].id(), {task}); + AWAIT_READY(startingStatus); + EXPECT_EQ(TASK_STARTING, startingStatus->state()); + EXPECT_EQ(task.task_id(), startingStatus->task_id()); + + AWAIT_READY(statusUpdateAck1); + AWAIT_READY(runningStatus); EXPECT_EQ(TASK_RUNNING, runningStatus->state()); EXPECT_EQ(task.task_id(), runningStatus->task_id()); - AWAIT_READY(statusUpdateAck); + AWAIT_READY(statusUpdateAck2); // Simulate master failover. We leave the scheduler without a master // so it does not attempt to re-register.