This is an automated email from the ASF dual-hosted git repository.
bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new bba0416 Garbage-collected lost tasks which are reported as running
again.
bba0416 is described below
commit bba0416679b21734b5dbdc080c978e20a31664de
Author: Benjamin Bannier <[email protected]>
AuthorDate: Fri Nov 1 13:08:35 2019 +0100
Garbage-collected lost tasks which are reported as running again.
Under certain conditions tasks which were previously `TASK_LOST` and
completed can reappear in non-terminal states, e.g., if the agent on
which they where running reconnect.
This patch adds garbage collection of such completed tasks so that users
do not see tasks twice when obtaining task information from the master
API. This change does not affect tasks status updates where we already
correctly reported a previously `TASK_LOST` state as superseded by e.g.,
`TASK_RUNNING`.
Review: https://reviews.apache.org/r/71641/
---
src/master/master.cpp | 18 +++++
src/tests/master_tests.cpp | 194 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 212 insertions(+)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 351823e..2fdd6f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7843,6 +7843,24 @@ void Master::__reregisterSlave(
Framework* framework = getFramework(frameworkId);
if (framework != nullptr) {
framework->unreachableTasks.erase(task.task_id());
+
+ // The master transitions task to terminal state on its own in certain
+ // scenarios (e.g., framework or agent teardown) before instructing the
+ // agent to remove it. However, we are not guaranteed that the message
+ // reaches the agent and is processed by it. If the agent fails to act
+ // on the message, tasks the master has declared terminal might
reappear
+ // from the agent as non-terminal, see e.g., MESOS-9940.
+ //
+ // Avoid tracking a task as both terminal and non-terminal by
+ // garbage-collected completed tasks which come back as running.
+ framework->completedTasks.erase(
+ std::remove_if(
+ framework->completedTasks.begin(),
+ framework->completedTasks.end(),
+ [&](const Owned<Task>& task_) {
+ return task_.get() && task_->task_id() == task.task_id();
+ }),
+ framework->completedTasks.end());
}
const string message = slaves.unreachable.contains(slaveInfo.id())
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5486e23..9688f5f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -98,6 +98,8 @@ using
mesos::master::contender::MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
+using mesos::slave::ContainerTermination;
+
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
@@ -10939,6 +10941,198 @@ TEST_F(MasterTest, CollectAuthorizations)
}
}
+// This test checks that if a task is transitioned to `TASK_LOST`
+// and completed, should the agent come back it is added again and the
+// outdated completed task is removed.
+TEST_F(MasterTest, LostTaskCleanup) {
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ auto slaveOptions = SlaveOptions(detector.get())
+ .withFlags(CreateSlaveFlags())
+ .withId(process::ID::generate())
+ .withContainerizer(&containerizer);
+ Try<Owned<cluster::Slave>> slave = StartSlave(slaveOptions);
+ ASSERT_SOME(slave);
+
+ Clock::advance(slaveOptions.flags->registration_backoff_factor);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ // Start the scheduler and launch a task.
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+ Offer offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ *task.mutable_slave_id() = offer.slave_id();
+ *task.mutable_resources() = offer.resources();
+ *task.mutable_executor() = DEFAULT_EXECUTOR_INFO;
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(containerizer, update(_, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Promise<Option<ContainerTermination>> hang;
+
+ EXPECT_CALL(containerizer, wait(_))
+ .WillRepeatedly(Return(hang.future()));
+
+ Future<TaskStatus> statusRunning1;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning1));
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(statusRunning1);
+ ASSERT_EQ(TASK_RUNNING, statusRunning1->state());
+
+ // Shut down the agent. We prevent the task from ultimately shutting
+ // down, and it will be added again when the agent restarts. We use
+ // maintenance mode which will transition all tasks on the agent to
+ // `TASK_LOST`.
+ Future<TaskStatus> statusLost;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusLost));
+
+ EXPECT_CALL(sched, slaveLost(_, _));
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Content-Type"] = "application/json";
+
+ MachineID machineId;
+ machineId.set_ip(stringify(slave.get()->pid.address.ip));
+ machineId.set_hostname(*process::address().lookup_hostname());
+
+ auto response = process::http::post(
+ master.get()->pid,
+ "maintenance/schedule",
+ headers,
+ stringify(JSON::protobuf(protobuf::maintenance::createSchedule(
+ {protobuf::maintenance::createWindow(
+ {machineId},
+ protobuf::maintenance::createUnavailability(
+ Clock::now(), Days(365)))}))));
+
+ AWAIT_ASSERT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ // Prevent the executor from shutting down before the agent exits.
+ DROP_PROTOBUF(ShutdownExecutorMessage(), _, _);
+
+ response = process::http::post(
+ master.get()->pid,
+ "machine/down",
+ headers,
+ stringify(JSON::protobuf(
+ protobuf::maintenance::createMachineList({machineId}))));
+
+ AWAIT_ASSERT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ AWAIT_READY(statusLost);
+ ASSERT_EQ(TASK_LOST, statusLost->state());
+ ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+
+ // Reactivate the agent and restart it. Since the task was never
+ // successfully killed it will be reported as `TASK_RUNNING` again.
+ response = process::http::post(
+ master.get()->pid,
+ "machine/up",
+ headers,
+ stringify(JSON::protobuf(
+ protobuf::maintenance::createMachineList({machineId}))));
+
+ AWAIT_ASSERT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ // Forcibly terminate the agent which still waits for the tasks to terminate.
+ slave.get()->terminate();
+
+ auto reconnectExecutorMessage =
+ FUTURE_PROTOBUF(ReconnectExecutorMessage(), _, _);
+
+ auto slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ EXPECT_CALL(containerizer, recover(_))
+ .WillRepeatedly(Return(Nothing()));
+
+ EXPECT_CALL(exec, reregistered(_, _));
+
+ Future<TaskStatus> statusRunning2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning2));
+
+ slave = StartSlave(slaveOptions);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(reconnectExecutorMessage);
+
+ Clock::advance(slaveOptions.flags->executor_reregistration_timeout);
+ Clock::settle();
+
+ Clock::advance(slaveOptions.flags->registration_backoff_factor);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ AWAIT_READY(statusRunning2);
+ ASSERT_EQ(TASK_RUNNING, statusRunning2->state());
+ ASSERT_EQ(TaskStatus::SOURCE_MASTER, statusRunning2->source());
+
+ // Confirm that the task is running and not reported as completed.
+ v1::master::Call call;
+ call.set_type(v1::master::Call::GET_TASKS);
+ response = process::http::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(ContentType::JSON, call),
+ stringify(ContentType::JSON));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ Try<v1::master::Response> response_ =
+ deserialize<v1::master::Response>(ContentType::JSON, response->body);
+
+ ASSERT_SOME(response_);
+
+ const RepeatedPtrField<mesos::v1::Task>& tasks =
+ response_->get_tasks().tasks();
+
+ ASSERT_EQ(1, tasks.size()) << JSON::protobuf(tasks);
+ EXPECT_EQ(v1::TASK_RUNNING, tasks[0].state()) << JSON::protobuf(tasks[0]);
+
+ const RepeatedPtrField<mesos::v1::Task>& completedTasks =
+ response_->get_tasks().completed_tasks();
+
+ EXPECT_TRUE(completedTasks.empty()) << JSON::protobuf(completedTasks);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {