oops sorry. looks like i created a branch instead of committing the patch to head. will fix shortly.
On Mon, Nov 25, 2013 at 3:43 PM, <[email protected]> wrote: > Updated Branches: > refs/heads/vinod/master_ignore_framework [created] 4c11e98c4 > > > Fixed master to ignore messages from unregistered framework. > > Review: https://reviews.apache.org/r/15773 > > > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c11e98c > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c11e98c > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c11e98c > > Branch: refs/heads/vinod/master_ignore_framework > Commit: 4c11e98c4d0385ecb1369c93a0f58595eee54857 > Parents: f051c45 > Author: Vinod Kone <[email protected]> > Authored: Thu Nov 21 17:42:00 2013 -0800 > Committer: Vinod Kone <[email protected]> > Committed: Mon Nov 25 15:36:40 2013 -0800 > > ---------------------------------------------------------------------- > src/master/master.cpp | 391 +++++++++++++++++++------------ > src/master/master.hpp | 5 + > src/tests/fault_tolerance_tests.cpp | 121 ++++++++++ > 3 files changed, 373 insertions(+), 144 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.cpp > ---------------------------------------------------------------------- > diff --git a/src/master/master.cpp b/src/master/master.cpp > index a08d012..f13d2a0 100644 > --- a/src/master/master.cpp > +++ b/src/master/master.cpp > @@ -771,7 +771,8 @@ void Master::registerFramework( > } > > if (!elected()) { > - LOG(WARNING) << "Ignoring register framework message since not > elected yet"; > + LOG(WARNING) << "Ignoring register framework message from " << from > + << " since not elected yet"; > return; > } > > @@ -850,8 +851,8 @@ void Master::reregisterFramework( > } > > if (!elected()) { > - LOG(WARNING) << "Ignoring re-register framework message since " > - << "not elected yet"; > + LOG(WARNING) << "Ignoring re-register framework message from " << from > + << " since not elected yet"; > return; > } > > @@ -911,6 +912,15 @@ void Master::reregisterFramework( > // us a different framework name, user name or executor info? > LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over"; > failoverFramework(framework, from); > + } else if (from != framework->pid) { > + LOG(ERROR) > + << "Framework " << frameworkInfo.id() << " at " << from > + << " attempted to re-register while a framework at " << > framework->pid > + << " is already registered"; > + FrameworkErrorMessage message; > + message.set_message("Framework failed over"); > + send(from, message); > + return; > } else { > LOG(INFO) << "Allowing the Framework " << frameworkInfo.id() > << " to re-register with an already used id"; > @@ -920,9 +930,8 @@ void Master::reregisterFramework( > // replied to the offers but the driver might have dropped > // those messages since it wasn't connected to the master. > foreach (Offer* offer, utils::copy(framework->offers)) { > - allocator->resourcesRecovered(offer->framework_id(), > - offer->slave_id(), > - offer->resources()); > + allocator->resourcesRecovered( > + offer->framework_id(), offer->slave_id(), offer->resources()); > removeOffer(offer); > } > > @@ -1000,8 +1009,10 @@ void Master::unregisterFramework( > if (framework->pid == from) { > removeFramework(framework); > } else { > - LOG(WARNING) << from << " tried to unregister framework; " > - << "expecting " << framework->pid; > + LOG(WARNING) > + << "Ignoring unregister framework message for framework " << > frameworkId > + << " from " << from << " because it is not from the registered" > + << " framework " << framework->pid; > } > } > } > @@ -1013,15 +1024,22 @@ void Master::deactivateFramework( > { > Framework* framework = getFramework(frameworkId); > > - if (framework != NULL) { > - if (framework->pid == from) { > - LOG(INFO) << from << " asked to deactivate framework " << > frameworkId; > - deactivate(framework); > - } else { > - LOG(WARNING) << from << " tried to deactivate framework; " > - << "expecting " << framework->pid; > - } > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring deactivate framework message for framework " << > frameworkId > + << " because the framework cannot be found"; > + return; > } > + > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring deactivate framework message for framework " << > frameworkId > + << " from '" << from << "' because it is not from the registered" > + << " framework '" << framework->pid << "'"; > + return; > + } > + > + deactivate(framework); > } > > > @@ -1053,177 +1071,257 @@ void Master::deactivate(Framework* framework) > } > > > -void Master::resourceRequest(const FrameworkID& frameworkId, > - const vector<Request>& requests) > +void Master::resourceRequest( > + const UPID& from, > + const FrameworkID& frameworkId, > + const vector<Request>& requests) > { > + Framework* framework = getFramework(frameworkId); > + > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring resource request message from framework " << > frameworkId > + << " because the framework cannot be found"; > + return; > + } > + > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring resource request message from framework " << > frameworkId > + << " from '" << from << "' because it is not from the registered " > + << " framework '" << framework->pid << "'"; > + return; > + } > + > + LOG(INFO) << "Requesting resources for framework " << frameworkId; > allocator->resourcesRequested(frameworkId, requests); > } > > > -void Master::launchTasks(const FrameworkID& frameworkId, > - const OfferID& offerId, > - const vector<TaskInfo>& tasks, > - const Filters& filters) > +void Master::launchTasks( > + const UPID& from, > + const FrameworkID& frameworkId, > + const OfferID& offerId, > + const vector<TaskInfo>& tasks, > + const Filters& filters) > { > Framework* framework = getFramework(frameworkId); > - if (framework != NULL) { > - // TODO(benh): Support offer "hoarding" and allow multiple offers > - // *from the same slave* to be used to launch tasks. This can be > - // accomplished rather easily by collecting and merging all offers > - // into a mega-offer and passing that offer to > - // Master::processTasks. > - Offer* offer = getOffer(offerId); > - if (offer != NULL) { > - CHECK_EQ(offer->framework_id(), frameworkId) > - << "Offer " << offerId > - << " has invalid frameworkId " << offer->framework_id(); > - > - Slave* slave = getSlave(offer->slave_id()); > - CHECK(slave != NULL) > - << "Offer " << offerId << " outlived slave " > - << slave->id << " (" << slave->info.hostname() << ")"; > > - // If a slave is disconnected we should've removed its offers. > - CHECK(!slave->disconnected) > - << "Offer " << offerId << " outlived disconnected slave " > - << slave->id << " (" << slave->info.hostname() << ")"; > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring launch tasks message for offer " << offerId > + << " of framework " << frameworkId > + << " because the framework cannot be found"; > + return; > + } > > - processTasks(offer, framework, slave, tasks, filters); > - } else { > - // The offer is gone (possibly rescinded, lost slave, re-reply > - // to same offer, etc). Report all tasks in it as failed. > - // TODO: Consider adding a new task state TASK_INVALID for > - // situations like these. > - LOG(WARNING) << "Offer " << offerId << " is no longer valid"; > - foreach (const TaskInfo& task, tasks) { > - StatusUpdateMessage message; > - StatusUpdate* update = message.mutable_update(); > - update->mutable_framework_id()->MergeFrom(frameworkId); > - TaskStatus* status = update->mutable_status(); > - status->mutable_task_id()->MergeFrom(task.task_id()); > - status->set_state(TASK_LOST); > - status->set_message("Task launched with invalid offer"); > - update->set_timestamp(Clock::now().secs()); > - update->set_uuid(UUID::random().toBytes()); > - > - LOG(INFO) << "Sending status update " << *update > - << " for launch task attempt on invalid offer " << > offerId; > - send(framework->pid, message); > - } > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring launch tasks message for offer " << offerId > + << " of framework " << frameworkId << " from '" << from > + << "' because it is not from the registered framework '" > + << framework->pid << "'"; > + return; > + } > + > + // TODO(benh): Support offer "hoarding" and allow multiple offers > + // *from the same slave* to be used to launch tasks. This can be > + // accomplished rather easily by collecting and merging all offers > + // into a mega-offer and passing that offer to > + // Master::processTasks. > + Offer* offer = getOffer(offerId); > + if (offer != NULL) { > + CHECK_EQ(offer->framework_id(), frameworkId) > + << "Offer " << offerId > + << " has invalid frameworkId " << offer->framework_id(); > + > + Slave* slave = getSlave(offer->slave_id()); > + CHECK(slave != NULL) > + << "Offer " << offerId << " outlived slave " > + << slave->id << " (" << slave->info.hostname() << ")"; > + > + // If a slave is disconnected we should've removed its offers. > + CHECK(!slave->disconnected) > + << "Offer " << offerId << " outlived disconnected slave " > + << slave->id << " (" << slave->info.hostname() << ")"; > + > + processTasks(offer, framework, slave, tasks, filters); > + } else { > + // The offer is gone (possibly rescinded, lost slave, re-reply > + // to same offer, etc). Report all tasks in it as failed. > + // TODO: Consider adding a new task state TASK_INVALID for > + // situations like these. > + LOG(WARNING) << "Offer " << offerId << " is no longer valid"; > + foreach (const TaskInfo& task, tasks) { > + StatusUpdateMessage message; > + StatusUpdate* update = message.mutable_update(); > + update->mutable_framework_id()->MergeFrom(frameworkId); > + TaskStatus* status = update->mutable_status(); > + status->mutable_task_id()->MergeFrom(task.task_id()); > + status->set_state(TASK_LOST); > + status->set_message("Task launched with invalid offer"); > + update->set_timestamp(Clock::now().secs()); > + update->set_uuid(UUID::random().toBytes()); > + > + LOG(INFO) << "Sending status update " << *update > + << " for launch task attempt on invalid offer " << > offerId; > + send(framework->pid, message); > } > } > } > > > -void Master::reviveOffers(const FrameworkID& frameworkId) > +void Master::reviveOffers(const UPID& from, const FrameworkID& > frameworkId) > { > Framework* framework = getFramework(frameworkId); > - if (framework != NULL) { > - LOG(INFO) << "Reviving offers for framework " << framework->id; > - allocator->offersRevived(framework->id); > + > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring revive offers message for framework " << frameworkId > + << " because the framework cannot be found"; > + return; > + } > + > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring revive offers message for framework " << frameworkId > + << " from '" << from << "' because it is not from the registered" > + << " framework '" << framework->pid << "'"; > + return; > } > + > + LOG(INFO) << "Reviving offers for framework " << framework->id; > + allocator->offersRevived(framework->id); > } > > > -void Master::killTask(const FrameworkID& frameworkId, > - const TaskID& taskId) > +void Master::killTask( > + const UPID& from, > + const FrameworkID& frameworkId, > + const TaskID& taskId) > { > LOG(INFO) << "Asked to kill task " << taskId > << " of framework " << frameworkId; > > Framework* framework = getFramework(frameworkId); > - if (framework != NULL) { > - Task* task = framework->getTask(taskId); > - if (task != NULL) { > - Slave* slave = getSlave(task->slave_id()); > - CHECK(slave != NULL) << "Unknown slave " << task->slave_id(); > - > - // We add the task to 'killedTasks' here because the slave > - // might be partitioned or disconnected but the master > - // doesn't know it yet. > - slave->killedTasks.put(frameworkId, taskId); > - > - // NOTE: This task will be properly reconciled when the > - // disconnected slave re-registers with the master. > - if (!slave->disconnected) { > - LOG(INFO) << "Telling slave " << slave->id << " (" > - << slave->info.hostname() << ")" > - << " to kill task " << taskId > - << " of framework " << frameworkId; > - > - KillTaskMessage message; > - message.mutable_framework_id()->MergeFrom(frameworkId); > - message.mutable_task_id()->MergeFrom(taskId); > - send(slave->pid, message); > - } > - } else { > - // TODO(benh): Once the scheduler has persistance and > - // high-availability of it's tasks, it will be the one that > - // determines that this invocation of 'killTask' is silly, and > - // can just return "locally" (i.e., after hitting only the other > - // replicas). Unfortunately, it still won't know the slave id. > - > - LOG(WARNING) << "Cannot kill task " << taskId > - << " of framework " << frameworkId > - << " because it cannot be found"; > - StatusUpdateMessage message; > - StatusUpdate* update = message.mutable_update(); > - update->mutable_framework_id()->MergeFrom(frameworkId); > - TaskStatus* status = update->mutable_status(); > - status->mutable_task_id()->MergeFrom(taskId); > - status->set_state(TASK_LOST); > - status->set_message("Task not found"); > - update->set_timestamp(Clock::now().secs()); > - update->set_uuid(UUID::random().toBytes()); > - send(framework->pid, message); > + > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring kill task message for task " << taskId << " of > framework " > + << frameworkId << " because the framework cannot be found"; > + return; > + } > + > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring kill task message for task " << taskId > + << " of framework " << frameworkId << " from '" << from > + << "' because it is not from the registered framework '" > + << framework->pid << "'"; > + return; > + } > + > + Task* task = framework->getTask(taskId); > + if (task != NULL) { > + Slave* slave = getSlave(task->slave_id()); > + CHECK(slave != NULL) << "Unknown slave " << task->slave_id(); > + > + // We add the task to 'killedTasks' here because the slave > + // might be partitioned or disconnected but the master > + // doesn't know it yet. > + slave->killedTasks.put(frameworkId, taskId); > + > + // NOTE: This task will be properly reconciled when the > + // disconnected slave re-registers with the master. > + if (!slave->disconnected) { > + LOG(INFO) << "Telling slave " << slave->id << " (" > + << slave->info.hostname() << ")" > + << " to kill task " << taskId > + << " of framework " << frameworkId; > + > + KillTaskMessage message; > + message.mutable_framework_id()->MergeFrom(frameworkId); > + message.mutable_task_id()->MergeFrom(taskId); > + send(slave->pid, message); > } > } else { > - LOG(WARNING) << "Failed to kill task " << taskId > + // TODO(benh): Once the scheduler has persistance and > + // high-availability of it's tasks, it will be the one that > + // determines that this invocation of 'killTask' is silly, and > + // can just return "locally" (i.e., after hitting only the other > + // replicas). Unfortunately, it still won't know the slave id. > + > + LOG(WARNING) << "Cannot kill task " << taskId > << " of framework " << frameworkId > - << " because the framework cannot be found"; > + << " because the task cannot be found"; > + StatusUpdateMessage message; > + StatusUpdate* update = message.mutable_update(); > + update->mutable_framework_id()->MergeFrom(frameworkId); > + TaskStatus* status = update->mutable_status(); > + status->mutable_task_id()->MergeFrom(taskId); > + status->set_state(TASK_LOST); > + status->set_message("Task not found"); > + update->set_timestamp(Clock::now().secs()); > + update->set_uuid(UUID::random().toBytes()); > + send(framework->pid, message); > } > } > > > -void Master::schedulerMessage(const SlaveID& slaveId, > - const FrameworkID& frameworkId, > - const ExecutorID& executorId, > - const string& data) > +void Master::schedulerMessage( > + const UPID& from, > + const SlaveID& slaveId, > + const FrameworkID& frameworkId, > + const ExecutorID& executorId, > + const string& data) > { > Framework* framework = getFramework(frameworkId); > - if (framework != NULL) { > - Slave* slave = getSlave(slaveId); > - if (slave != NULL) { > - if (!slave->disconnected) { > - LOG(INFO) << "Sending framework message for framework " > - << frameworkId << " to slave " << slaveId > - << " (" << slave->info.hostname() << ")"; > - > - FrameworkToExecutorMessage message; > - message.mutable_slave_id()->MergeFrom(slaveId); > - message.mutable_framework_id()->MergeFrom(frameworkId); > - message.mutable_executor_id()->MergeFrom(executorId); > - message.set_data(data); > - send(slave->pid, message); > > - stats.validFrameworkMessages++; > - } else { > - LOG(WARNING) << "Cannot send framework message for framework " > - << frameworkId << " to slave " << slaveId > - << " (" << slave->info.hostname() << ")" > - << " because slave is disconnected"; > - stats.invalidFrameworkMessages++; > - } > + if (framework == NULL) { > + LOG(WARNING) > + << "Ignoring framework message for executor " << executorId > + << " of framework " << frameworkId > + << " because the framework cannot be found"; > + stats.invalidFrameworkMessages++; > + return; > + } > + > + if (from != framework->pid) { > + LOG(WARNING) > + << "Ignoring framework message for executor " << executorId > + << " of framework " << frameworkId << " from " << from > + << " because it is not from the registered framework " > + << framework->pid; > + stats.invalidFrameworkMessages++; > + return; > + } > + > + Slave* slave = getSlave(slaveId); > + if (slave != NULL) { > + if (!slave->disconnected) { > + LOG(INFO) << "Sending framework message for framework " > + << frameworkId << " to slave " << slaveId > + << " (" << slave->info.hostname() << ")"; > + > + FrameworkToExecutorMessage message; > + message.mutable_slave_id()->MergeFrom(slaveId); > + message.mutable_framework_id()->MergeFrom(frameworkId); > + message.mutable_executor_id()->MergeFrom(executorId); > + message.set_data(data); > + send(slave->pid, message); > + > + stats.validFrameworkMessages++; > } else { > LOG(WARNING) << "Cannot send framework message for framework " > << frameworkId << " to slave " << slaveId > - << " because slave does not exist"; > + << " (" << slave->info.hostname() << ")" > + << " because slave is disconnected"; > stats.invalidFrameworkMessages++; > } > } else { > LOG(WARNING) << "Cannot send framework message for framework " > << frameworkId << " to slave " << slaveId > - << " because framework does not exist"; > + << " because slave does not exist"; > stats.invalidFrameworkMessages++; > } > } > @@ -1707,6 +1805,11 @@ void Master::offer(const FrameworkID& frameworkId, > } > > > +// TODO(vinod): If due to network partition there are two instances > +// of the framework that think they are leaders and try to > +// authenticate with master they would be stepping on each other's > +// toes. Currently it is tricky to detect this case because the > +// 'authenticate' message doesn't contain the 'FrameworkID'. > void Master::authenticate(const UPID& from, const UPID& pid) > { > // Deactivate the framework if it's already registered. > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/master/master.hpp > ---------------------------------------------------------------------- > diff --git a/src/master/master.hpp b/src/master/master.hpp > index c86c1f1..624c133 100644 > --- a/src/master/master.hpp > +++ b/src/master/master.hpp > @@ -110,19 +110,24 @@ public: > const process::UPID& from, > const FrameworkID& frameworkId); > void resourceRequest( > + const process::UPID& from, > const FrameworkID& frameworkId, > const std::vector<Request>& requests); > void launchTasks( > + const process::UPID& from, > const FrameworkID& frameworkId, > const OfferID& offerId, > const std::vector<TaskInfo>& tasks, > const Filters& filters); > void reviveOffers( > + const process::UPID& from, > const FrameworkID& frameworkId); > void killTask( > + const process::UPID& from, > const FrameworkID& frameworkId, > const TaskID& taskId); > void schedulerMessage( > + const process::UPID& from, > const SlaveID& slaveId, > const FrameworkID& frameworkId, > const ExecutorID& executorId, > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/4c11e98c/src/tests/fault_tolerance_tests.cpp > ---------------------------------------------------------------------- > diff --git a/src/tests/fault_tolerance_tests.cpp > b/src/tests/fault_tolerance_tests.cpp > index 6cb5829..687c981 100644 > --- a/src/tests/fault_tolerance_tests.cpp > +++ b/src/tests/fault_tolerance_tests.cpp > @@ -1379,6 +1379,127 @@ TEST_F(FaultToleranceTest, > SchedulerFailoverFrameworkMessage) > } > > > +// This test verifies that a partitioned framework that still > +// thinks it is registered with the master cannot kill a task because > +// the master has re-registered another instance of the framework. > +// What this test does: > +// 1. Launch a master, slave and scheduler. > +// 2. Scheduler launches a task. > +// 3. Launch a second failed over scheduler. > +// 4. Make the first scheduler believe it is still registered. > +// 5. First scheduler attempts to kill the task which is ignored by the > master. > +TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework) > +{ > + Try<PID<Master> > master = StartMaster(); > + ASSERT_SOME(master); > + > + MockExecutor exec(DEFAULT_EXECUTOR_ID); > + > + Try<PID<Slave> > slave = StartSlave(&exec); > + ASSERT_SOME(slave); > + > + // Start the first scheduler and launch a task. > + MockScheduler sched1; > + MesosSchedulerDriver driver1( > + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); > + > + FrameworkID frameworkId; > + EXPECT_CALL(sched1, registered(&driver1, _, _)) > + .WillOnce(SaveArg<1>(&frameworkId)); > + > + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) > + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) > + .WillRepeatedly(Return()); // Ignore subsequent offers. > + > + Future<TaskStatus> status; > + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) > + .WillOnce(FutureArg<1>(&status)); > + > + ExecutorDriver* execDriver; > + EXPECT_CALL(exec, registered(_, _, _, _)) > + .WillOnce(SaveArg<0>(&execDriver)); > + > + EXPECT_CALL(exec, launchTask(_, _)) > + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); > + > + driver1.start(); > + > + AWAIT_READY(status); > + EXPECT_EQ(TASK_RUNNING, status.get().state()); > + > + // Now start the second failed over scheduler. > + MockScheduler sched2; > + > + FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. > + framework2 = DEFAULT_FRAMEWORK_INFO; > + framework2.mutable_id()->MergeFrom(frameworkId); > + > + MesosSchedulerDriver driver2( > + &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); > + > + Future<Nothing> registered; > + EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) > + .WillOnce(FutureSatisfy(®istered)); > + > + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) > + .WillRepeatedly(Return()); // Ignore any offers. > + > + // Drop the framework error message from the master to simulate > + // a partitioned framework. > + Future<FrameworkErrorMessage> frameworkErrorMessage = > + DROP_PROTOBUF(FrameworkErrorMessage(), _ , _); > + > + driver2.start(); > + > + AWAIT_READY(frameworkErrorMessage); > + > + AWAIT_READY(registered); > + > + // Now both the frameworks think they are registered with the > + // master, but the master only knows about the second framework. > + > + // A 'killTask' by first framework should be dropped by the master. > + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) > + .Times(0); > + > + // 'TASK_FINSIHED' by the executor should reach the second framework. > + Future<TaskStatus> status2; > + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) > + .WillOnce(FutureArg<1>(&status2)); > + > + Future<KillTaskMessage> killTaskMessage = > + FUTURE_PROTOBUF(KillTaskMessage(), _, _); > + > + driver1.killTask(status.get().task_id()); > + > + AWAIT_READY(killTaskMessage); > + > + // By this point the master must have processed and ignored the > + // 'killTask' message from the first framework. To verify this, > + // the executor sends 'TASK_FINISHED' to ensure the only update > + // received by the scheduler is 'TASK_FINISHED' and not > + // 'TASK_KILLED'. > + TaskStatus finishedStatus; > + finishedStatus = status.get(); > + finishedStatus.set_state(TASK_FINISHED); > + execDriver->sendStatusUpdate(finishedStatus); > + > + AWAIT_READY(status2); > + EXPECT_EQ(TASK_FINISHED, status2.get().state()); > + > + EXPECT_CALL(exec, shutdown(_)) > + .Times(AtMost(1)); > + > + driver1.stop(); > + driver2.stop(); > + > + driver1.join(); > + driver2.join(); > + > + Shutdown(); > +} > + > + > // This test checks that a scheduler exit shuts down the executor. > TEST_F(FaultToleranceTest, SchedulerExit) > { > >
