Send status updates when an unreachable agent re-registers. Master will send task status updates to frameworks upon agent re-registration if the agent: - has previously been removed by the master for being unreachable or - is unknown to the master due to the garbage collection of the unreachable and gone agents in the registry and the master's state.
Review: https://reviews.apache.org/r/64098/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5e5a8102 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5e5a8102 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5e5a8102 Branch: refs/heads/master Commit: 5e5a8102c3281db25a37157dac123b0ca546e030 Parents: 34503f8 Author: Megha Sharma <[email protected]> Authored: Tue Dec 12 08:21:19 2017 -0800 Committer: Jiang Yan Xu <[email protected]> Committed: Tue Dec 12 10:09:54 2017 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 46 +++++++++-- src/tests/master_allocator_tests.cpp | 8 +- src/tests/master_tests.cpp | 58 +++++++------- src/tests/partition_tests.cpp | 119 ++++++++++++----------------- src/tests/persistent_volume_tests.cpp | 6 +- src/tests/slave_recovery_tests.cpp | 6 +- src/tests/upgrade_tests.cpp | 1 + 7 files changed, 132 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 50475e7..efe8b8f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -6696,10 +6696,6 @@ void Master::__reregisterSlave( VLOG(1) << "Re-admitted agent " << slaveInfo.id() << " at " << pid << " (" << slaveInfo.hostname() << ")"; - // Ensure we don't remove the slave for not re-registering after - // we've recovered it from the registry. - slaves.recovered.erase(slaveInfo.id()); - // For agents without the MULTI_ROLE capability, // we need to inject the allocation role inside // the task and executor resources; @@ -6807,9 +6803,43 @@ void Master::__reregisterSlave( continue; } - Framework* framework = getFramework(frameworkId); - if (framework != nullptr) { - framework->unreachableTasks.erase(task.task_id()); + if (!slaves.recovered.contains(slaveInfo.id())) { + Framework* framework = getFramework(frameworkId); + if (framework != nullptr) { + framework->unreachableTasks.erase(task.task_id()); + } + + const string message = slaves.unreachable.contains(slaveInfo.id()) + ? "Unreachable agent re-reregistered" + : "Unknown agent re-registered"; + + const StatusUpdate& update = protobuf::createStatusUpdate( + task.framework_id(), + task.slave_id(), + task.task_id(), + task.state(), + TaskStatus::SOURCE_MASTER, + None(), + message, + TaskStatus::REASON_SLAVE_REREGISTERED, + (task.has_executor_id() + ? Option<ExecutorID>(task.executor_id()) : None()), + protobuf::getTaskHealth(task), + protobuf::getTaskCheckStatus(task), + None(), + protobuf::getTaskContainerStatus(task)); + + if (framework == nullptr || !framework->connected()) { + LOG(WARNING) << "Dropping update " << update + << (update.status().has_message() + ? " '" + update.status().message() + "'" + : "") + << " for " + << (framework == nullptr ? "unknown" : "disconnected") + << " framework " << frameworkId; + } else { + forward(update, UPID(), framework); + } } recoveredTasks.push_back(std::move(task)); @@ -6829,6 +6859,8 @@ void Master::__reregisterSlave( resourceVersion = uuid.get(); } + slaves.recovered.erase(slaveInfo.id()); + Slave* slave = new Slave( this, slaveInfo, http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/master_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp index 10de6f0..9bca27c 100644 --- a/src/tests/master_allocator_tests.cpp +++ b/src/tests/master_allocator_tests.cpp @@ -1359,6 +1359,8 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) TestContainerizer containerizer(&exec); TestingMesosSchedulerDriver driver(&sched, &schedulerDetector); Try<Owned<cluster::Slave>> slave = nullptr; + master::Flags masterFlags = this->CreateMasterFlags(); + masterFlags.registry = "replicated_log"; // Explicit scope is to ensure all object associated with the // leading master (e.g. allocator) are destroyed once the master @@ -1369,7 +1371,8 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) EXPECT_CALL(allocator, initialize(_, _, _, _, _, _)); - Try<Owned<cluster::Master>> master = this->StartMaster(&allocator); + Try<Owned<cluster::Master>> master = this->StartMaster( + &allocator, masterFlags); ASSERT_SOME(master); slaveDetector.appoint(master.get()->pid); @@ -1433,7 +1436,8 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) EXPECT_CALL(sched, registered(&driver, _, _)); - Try<Owned<cluster::Master>> master2 = this->StartMaster(&allocator2); + Try<Owned<cluster::Master>> master2 = this->StartMaster( + &allocator2, masterFlags); ASSERT_SOME(master2); EXPECT_CALL(sched, disconnected(_)); http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 7b08767..a488403 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -3125,23 +3125,20 @@ TEST_F(MasterTest, UnreachableTaskAfterFailover) Future<SlaveReregisteredMessage> slaveReregisteredMessage = FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + Future<TaskStatus> runningAgainStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus)); + slaveDetector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(slaveReregisteredMessage); - // The task should have returned to TASK_RUNNING. This is true even - // for non-partition-aware frameworks, since we emulate the old - // "non-strict registry" semantics. - Future<TaskStatus> reconcileUpdate2; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate2)); - - driver.reconcileTasks({status}); - - AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate2->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2->reason()); + AWAIT_READY(runningAgainStatus); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus->state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REREGISTERED, + runningAgainStatus->reason()); + EXPECT_EQ(task.task_id(), runningAgainStatus->task_id()); Clock::resume(); @@ -7446,29 +7443,23 @@ TEST_F(MasterTest, AgentRestartNoReregister) Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); - detector.appoint(master.get()->pid); + Future<TaskStatus> runningAgainStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus)); + detector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(reregisterSlave2); AWAIT_READY(slaveReregistered); - Clock::resume(); + AWAIT_READY(runningAgainStatus); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus->state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REREGISTERED, + runningAgainStatus->reason()); + EXPECT_EQ(task.task_id(), runningAgainStatus->task_id()); - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.mutable_slave_id()->CopyFrom(slaveId); - status.set_state(TASK_STAGING); // Dummy value. - - Future<TaskStatus> reconcileUpdate; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate)); - - driver.reconcileTasks({status}); - - AWAIT_READY(reconcileUpdate); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate->reason()); + Clock::resume(); // Check metrics. JSON::Object stats = Metrics(); @@ -7920,12 +7911,21 @@ TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole) Future<SlaveReregisteredMessage> agentReregistered = FUTURE_PROTOBUF( SlaveReregisteredMessage(), master.get()->pid, agent.get()->pid); - detector.appoint(master.get()->pid); + Future<TaskStatus> runningAgainStatus; + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus)); + detector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(agentReregistered); + AWAIT_READY(runningAgainStatus); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus->state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REREGISTERED, + runningAgainStatus->reason()); + EXPECT_EQ(task.task_id(), runningAgainStatus->task_id()); + // Check that the framework is re-tracked under the role by the master. { Future<Response> response = process::http::get( http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index 54ccf78..a5a1468 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -399,26 +399,18 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware) Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF( SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid); + Future<TaskStatus> runningAgainStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus)); + detector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(slaveReregistered); - // Perform explicit reconciliation; the task should still be running. - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.mutable_slave_id()->CopyFrom(slaveId); - status.set_state(TASK_STAGING); // Dummy value. - - Future<TaskStatus> reconcileUpdate; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate)); - - driver.reconcileTasks({status}); - - AWAIT_READY(reconcileUpdate); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate->reason()); + AWAIT_READY(runningAgainStatus); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus->state()); + EXPECT_EQ(task.task_id(), runningAgainStatus->task_id()); Clock::resume(); @@ -713,28 +705,18 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware) Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF( SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid); + Future<TaskStatus> runningAgainStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus)); + detector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(slaveReregistered); - // Perform explicit reconciliation. The task should be running - // for the non-PARTITION_AWARE framework. - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.mutable_slave_id()->CopyFrom(slaveId); - status.set_state(TASK_STAGING); // Dummy value. - - Future<TaskStatus> reconcileUpdate; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate)); - - driver.reconcileTasks({status}); - - AWAIT_READY(reconcileUpdate); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate->reason()); - EXPECT_FALSE(reconcileUpdate->has_unreachable_time()); + AWAIT_READY(runningAgainStatus); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus->state()); + EXPECT_EQ(task.task_id(), runningAgainStatus->task_id()); Clock::resume(); @@ -833,6 +815,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( Clock::pause(); master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry = "replicated_log"; Try<Owned<cluster::Master>> master = StartMaster(masterFlags); ASSERT_SOME(master); @@ -1027,6 +1010,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( EXPECT_CALL(sched2, registered(&driver2, _, _)) .WillOnce(FutureSatisfy(®istered2)); + Future<TaskStatus> runningAgainStatus1; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus1)); + + Future<TaskStatus> runningAgainStatus2; + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&runningAgainStatus2)); + // Simulate a new master detected event to the slave and the schedulers. detector.appoint(master.get()->pid); @@ -1038,39 +1029,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( AWAIT_READY(registered1); AWAIT_READY(registered2); - // Have each scheduler perform explicit reconciliation. Both `task1` and - // `task2` should be running: `task2` because it is PARTITION_AWARE, - // `task1` because the master has failed over and we emulate the old - // "non-strict" semantics. - TaskStatus status1; - status1.mutable_task_id()->CopyFrom(task1.task_id()); - status1.mutable_slave_id()->CopyFrom(slaveId); - status1.set_state(TASK_STAGING); // Dummy value. - - Future<TaskStatus> reconcileUpdate1; - EXPECT_CALL(sched1, statusUpdate(&driver1, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate1)); - - driver1.reconcileTasks({status1}); - - AWAIT_READY(reconcileUpdate1); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate1->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1->reason()); - - TaskStatus status2; - status2.mutable_task_id()->CopyFrom(task2.task_id()); - status2.mutable_slave_id()->CopyFrom(slaveId); - status2.set_state(TASK_STAGING); // Dummy value. - - Future<TaskStatus> reconcileUpdate2; - EXPECT_CALL(sched2, statusUpdate(&driver2, _)) - .WillOnce(FutureArg<1>(&reconcileUpdate2)); + AWAIT_READY(runningAgainStatus1); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus1->state()); - driver2.reconcileTasks({status2}); - - AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_RUNNING, reconcileUpdate2->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2->reason()); + AWAIT_READY(runningAgainStatus2); + EXPECT_EQ(TASK_RUNNING, runningAgainStatus2->state()); Clock::resume(); @@ -2327,20 +2290,36 @@ TEST_F(PartitionTest, PartitionAwareTaskCompletedOnPartitionedAgent) Future<UpdateFrameworkMessage> frameworkUpdate = DROP_PROTOBUF( UpdateFrameworkMessage(), master.get()->pid, slave.get()->pid); - Future<TaskStatus> finishedStatus; + // The `finsihedStatusFromMaster` status update is sent as + // part of agent's re-registration with the master. + // The second status update `finsihedStatusFromAgent` here + // is sent by the agent's status update manager after it + // re-regsiters. + Future<TaskStatus> finsihedStatusFromMaster; + Future<TaskStatus> finsihedStatusFromAgent; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&finishedStatus)); + .WillOnce(FutureArg<1>(&finsihedStatusFromMaster)) + .WillOnce(FutureArg<1>(&finsihedStatusFromAgent)); detector.appoint(master.get()->pid); Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(slaveReregistered); + + AWAIT_READY(finsihedStatusFromMaster); + EXPECT_EQ(TASK_FINISHED, finsihedStatusFromMaster->state()); + EXPECT_EQ(TaskStatus::SOURCE_MASTER, finsihedStatusFromMaster->source()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REREGISTERED, + finsihedStatusFromMaster->reason()); + EXPECT_EQ(task.task_id(), finsihedStatusFromMaster->task_id()); + AWAIT_READY(frameworkUpdate); - AWAIT_READY(finishedStatus); - EXPECT_EQ(TASK_FINISHED, finishedStatus->state()); - EXPECT_EQ(task.task_id(), finishedStatus->task_id()); - EXPECT_EQ(slaveId, finishedStatus->slave_id()); + AWAIT_READY(finsihedStatusFromAgent); + EXPECT_EQ(TASK_FINISHED, finsihedStatusFromAgent->state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, finsihedStatusFromAgent->source()); + EXPECT_EQ(task.task_id(), finsihedStatusFromAgent->task_id()); + EXPECT_EQ(slaveId, finsihedStatusFromAgent->slave_id()); // Perform explicit reconciliation. The task should not be running. TaskStatus status; http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/persistent_volume_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp index f255382..8546769 100644 --- a/src/tests/persistent_volume_tests.cpp +++ b/src/tests/persistent_volume_tests.cpp @@ -1552,7 +1552,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) // volume are still running. TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover) { - Try<Owned<cluster::Master>> master = StartMaster(); + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry = "replicated_log"; + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); ASSERT_SOME(master); StandaloneMasterDetector detector(master.get()->pid); @@ -1661,7 +1663,7 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover) .WillOnce(FutureArg<1>(&offers2)) .WillRepeatedly(Return()); // Ignore subsequent offers. - master = StartMaster(); + master = StartMaster(masterFlags); ASSERT_SOME(master); // Simulate a new master detected event on the slave so that the http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 253b0fc..fdc4b06 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -4046,7 +4046,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover) TYPED_TEST(SlaveRecoveryTest, MasterFailover) { // Step 1. Run a task. - Try<Owned<cluster::Master>> master = this->StartMaster(); + master::Flags masterFlags = this->CreateMasterFlags(); + masterFlags.registry = "replicated_log"; + Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags); ASSERT_SOME(master); slave::Flags flags = this->CreateSlaveFlags(); @@ -4110,7 +4112,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover) EXPECT_CALL(sched, disconnected(&driver)); master->reset(); - master = this->StartMaster(); + master = this->StartMaster(masterFlags); ASSERT_SOME(master); Future<Nothing> registered; http://git-wip-us.apache.org/repos/asf/mesos/blob/5e5a8102/src/tests/upgrade_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/upgrade_tests.cpp b/src/tests/upgrade_tests.cpp index 0efaa58..dd73d73 100644 --- a/src/tests/upgrade_tests.cpp +++ b/src/tests/upgrade_tests.cpp @@ -103,6 +103,7 @@ TEST_F(UpgradeTest, ReregisterOldAgentWithMultiRoleMaster) // Start a master. master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry = "replicated_log"; Try<Owned<cluster::Master>> master = StartMaster(masterFlags); ASSERT_SOME(master);
