This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 12a26d388 Added old containers upgrade support in the XFS disk
isolator.
12a26d388 is described below
commit 12a26d3885767cdfed268a01f7da7e1d09ce7570
Author: Devin Leamy <[email protected]>
AuthorDate: Tue Jan 16 18:05:55 2024 -0500
Added old containers upgrade support in the XFS disk isolator.
Currently containers that were created prior to enabling the XFS disk
isolator were simply ignored by it. With this change the isolator
assigns XFS project IDs to sandboxes of the old containers upon recovery
and manages them.
---
.../containerizer/mesos/isolators/xfs/disk.cpp | 37 ++++++
src/tests/containerizer/xfs_quota_tests.cpp | 146 +++++++++++++++++++--
2 files changed, 174 insertions(+), 9 deletions(-)
diff --git a/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
index e1bbb3180..aa865bca5 100644
--- a/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
+++ b/src/slave/containerizer/mesos/isolators/xfs/disk.cpp
@@ -274,6 +274,8 @@ Future<Nothing> XfsDiskIsolatorProcess::recover(
alive.insert(state.container_id());
}
+ vector<string> unmanaged;
+
foreach (const string& sandbox, sandboxes.get()) {
// Skip the "latest" symlink.
if (os::stat::islink(sandbox)) {
@@ -297,6 +299,7 @@ Future<Nothing> XfsDiskIsolatorProcess::recover(
// first time an operator enables the XFS disk isolator and we recover a
// set of containers that we did not isolate.
if (projectId.isNone()) {
+ unmanaged.push_back(sandbox);
continue;
}
@@ -464,6 +467,40 @@ Future<Nothing> XfsDiskIsolatorProcess::recover(
}
}
+ // Assign project IDs to sandboxes that were previously not managed by this
+ // isolator. Quotas will be set later when the containerizer will send an
+ // update call upon executor re-registration.
+ foreach (const string& sandbox, unmanaged) {
+ ContainerID containerId;
+ containerId.set_value(Path(sandbox).basename());
+ CHECK(!infos.contains(containerId));
+
+ // Ignore the container if it is not alive or if it is a known orphan. In
+ // the latter case the containerizer will send a cleanup call for it
anyway.
+ if (orphans.contains(containerId) || !alive.contains(containerId)) {
+ continue;
+ }
+
+ Option<prid_t> projectId = nextProjectId();
+ if (projectId.isNone()) {
+ return Failure(
+ "Failed to assign project to sandbox : Failed to obtain"
+ + " next project ID: range exhausted");
+ }
+
+ infos.put(containerId, Owned<Info>(new Info(sandbox, projectId.get())));
+
+ Try<Nothing> status = xfs::setProjectId(sandbox, projectId.get());
+ if (status.isError()) {
+ return Failure(
+ "Failed to assign project " + stringify(projectId.get()) + ": " +
+ status.error());
+ }
+
+ LOG(INFO) << "Assigned project " << stringify(projectId.get()) << " to '"
+ << sandbox << "'";
+ }
+
return Nothing();
}
diff --git a/src/tests/containerizer/xfs_quota_tests.cpp
b/src/tests/containerizer/xfs_quota_tests.cpp
index 6703a1ca7..3333fe742 100644
--- a/src/tests/containerizer/xfs_quota_tests.cpp
+++ b/src/tests/containerizer/xfs_quota_tests.cpp
@@ -101,6 +101,19 @@ static QuotaInfo makeQuotaInfo(
}
+static bool waitForFileCreation(
+ const string& path,
+ const Duration& duration = Seconds(60))
+{
+ Stopwatch timer;
+ timer.start();
+ while (!os::exists(path) && timer.elapsed() <= duration) {
+ os::sleep(Milliseconds(50));
+ }
+ return os::exists(path);
+}
+
+
class ROOT_XFS_TestBase : public MesosTest
{
public:
@@ -1975,22 +1988,137 @@ TEST_P(ROOT_XFS_QuotaEnforcement, RecoverOldContainers)
process::dispatch(slave.get()->pid, &Slave::usage);
AWAIT_READY(usage);
- // We should still have 1 executor using resources but it doesn't
- // have disk limit enabled.
+ // We should still have 1 executor using resources and now it should have
+ // disk limit enabled.
ASSERT_EQ(1, usage->executors().size());
const ResourceUsage_Executor& executor = usage->executors().Get(0);
ASSERT_TRUE(executor.has_statistics());
+ ASSERT_TRUE(executor.statistics().has_disk_limit_bytes());
+ ASSERT_EQ(Megabytes(1), Bytes(executor.statistics().disk_limit_bytes()));
+ }
+
+ driver.stop();
+ driver.join();
+}
+
+
+// Verify that the XFS disk isolator is able to recover containers that were
+// created before it was enabled and that are now using more disk space than
+// their quota allows. We verify that those containers are terminated due to a
+// disk space limitation.
+TEST_P(ROOT_XFS_QuotaEnforcement, RecoverOldContainersExceedingQuota)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ // `CreateSlaveFlags()` enables `disk/xfs` so here we reset
+ // `isolation` to remove it.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "filesystem/linux,docker/runtime";
+
+ if (GetParam() == ParamDiskQuota::ROOTFS) {
+ flags.image_provisioner_backend = "overlay";
+
+ AWAIT_READY(DockerArchive::create(flags.docker_registry, "test_image"));
+ }
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ string containerReadyFile = path::join(flags.work_dir, "container_ready");
+
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:1").get(),
+ "dd if=/dev/zero of=file bs=1024 count=2048 && touch " +
+ containerReadyFile + " && sleep 1000");
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ AWAIT_READY(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ {
+ Future<ResourceUsage> usage =
+ process::dispatch(slave.get()->pid, &Slave::usage);
+ AWAIT_READY(usage);
+
+ // We should have 1 executor using resources but it doesn't have
+ // disk limit enabled.
+ ASSERT_EQ(1, usage.get().executors().size());
+ const ResourceUsage_Executor& executor = usage->executors().Get(0);
+ ASSERT_TRUE(executor.has_statistics());
ASSERT_FALSE(executor.statistics().has_disk_limit_bytes());
ASSERT_FALSE(executor.statistics().has_disk_used_bytes());
}
- // Verify that we haven't allocated any project IDs.
- JSON::Object metrics = Metrics();
- EXPECT_EQ(
- metrics.at<JSON::Number>("containerizer/mesos/disk/project_ids_total")
- ->as<int>(),
- metrics.at<JSON::Number>("containerizer/mesos/disk/project_ids_free")
- ->as<int>());
+ // Await for the ready file to be written.
+ ASSERT_TRUE(waitForFileCreation(containerReadyFile));
+
+ // Restart the slave.
+ slave.get()->terminate();
+ slave->reset();
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // This time use the agent flags that include XFS disk isolation.
+ flags = CreateSlaveFlags();
+ flags.xfs_kill_containers = true;
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Clock::pause();
+ Clock::settle();
+ Clock::advance(flags.container_disk_watch_interval);
+ Clock::resume();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status->task_id());
+ EXPECT_EQ(TASK_FAILED, status->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_LIMITATION_DISK, status->reason());
driver.stop();
driver.join();