Made the default executor populate volume mappings for disk resources. A task can now access any volumes specified in disk resources from its own sandbox owing to the default executor populating the mapping from the child container sandbox to the parent. Previously, without such a mapping it was not possible for the child container to access those volumes mounted on the parent container.
Review: https://reviews.apache.org/r/57469 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28876a44 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28876a44 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28876a44 Branch: refs/heads/master Commit: 28876a442da780156d9376c8e7a1671628857bb2 Parents: dcae184 Author: Anand Mazumdar <[email protected]> Authored: Wed Mar 8 17:51:03 2017 -0800 Committer: Anand Mazumdar <[email protected]> Committed: Mon Mar 20 15:39:55 2017 -0700 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 35 +++++++++ src/tests/default_executor_tests.cpp | 125 ++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/28876a44/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index cbd4f7e..6a885af 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -335,6 +335,41 @@ protected: launch->mutable_container()->CopyFrom(task.container()); } + // Currently, it is not possible to specify resources for nested + // containers (i.e., all resources are merged in the top level + // executor container). This means that any disk resources used by + // the task are mounted on the top level container. As a workaround, + // we set up the volume mapping allowing child containers to share + // the volumes from their parent containers sandbox. + foreach (const Resource& resource, task.resources()) { + // Ignore if there are no disk resources or if the + // disk resources did not specify a volume mapping. + if (!resource.has_disk() || !resource.disk().has_volume()) { + continue; + } + + // Set `ContainerInfo.type` to 'MESOS' if the task did + // not specify a container. + if (!task.has_container()) { + launch->mutable_container()->set_type(ContainerInfo::MESOS); + } + + const Volume& executorVolume = resource.disk().volume(); + + Volume* taskVolume = launch->mutable_container()->add_volumes(); + taskVolume->set_mode(executorVolume.mode()); + taskVolume->set_container_path(executorVolume.container_path()); + + Volume::Source* source = taskVolume->mutable_source(); + source->set_type(Volume::Source::SANDBOX_PATH); + + Volume::Source::SandboxPath* sandboxPath = + source->mutable_sandbox_path(); + + sandboxPath->set_type(Volume::Source::SandboxPath::PARENT); + sandboxPath->set_path(executorVolume.container_path()); + } + responses.push_back(post(connection.get(), call)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/28876a44/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index b701e7d..cbce486 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -1542,6 +1542,131 @@ TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) ASSERT_EQ(taskInfo.task_id(), updateFinished->status().task_id()); } + +// This test verifies that the default executor mounts the persistent volume +// in the task container when it is set on a task in the task group. +TEST_P(PersistentVolumeDefaultExecutor, ROOT_TaskSandboxPersistentVolume) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + flags.launcher = param.launcher; + flags.isolation = param.isolation; + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role(DEFAULT_TEST_ROLE); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), + FutureSatisfy(&connected))); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources unreserved = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID.value(), + None(), + None(), + v1::ExecutorInfo::DEFAULT); + + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(unreserved); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + const v1::Offer& offer = offers->offers(0); + + v1::Resource volume = v1::createPersistentVolume( + Megabytes(1), + frameworkInfo.role(), + "id1", + "task_volume_path", + frameworkInfo.principal(), + None(), + frameworkInfo.principal()); + + v1::Resources reserved = unreserved.flatten( + frameworkInfo.role(), + v1::createReservationInfo(frameworkInfo.principal())).get(); + + // Launch a task that expects the persistent volume to be + // mounted in its sandbox. + v1::TaskInfo taskInfo = v1::createTask( + offer.agent_id(), + reserved.apply(v1::CREATE(volume)).get(), + "echo abc > task_volume_path/file"); + + v1::Offer::Operation reserve = v1::RESERVE(reserved); + v1::Offer::Operation create = v1::CREATE(volume); + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo})); + + Future<Event::Update> updateRunning; + Future<Event::Update> updateFinished; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(DoAll(FutureArg<1>(&updateRunning), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))) + .WillOnce(FutureArg<1>(&updateFinished)); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {reserve, create, launchGroup})); + + AWAIT_READY(updateRunning); + ASSERT_EQ(TASK_RUNNING, updateRunning->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); + + AWAIT_READY(updateFinished); + ASSERT_EQ(TASK_FINISHED, updateFinished->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateFinished->status().task_id()); + + string volumePath = slave::paths::getPersistentVolumePath( + flags.work_dir, + devolve(volume)); + + string filePath = path::join(volumePath, "file"); + + // Ensure that the task was able to write to the persistent volume. + EXPECT_SOME_EQ("abc\n", os::read(filePath)); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
