Added unit test for persistent volume using default executor. This unit test verifies that the task group launched in the default executor can access the persistent volume on the executor level by using 'volume/sandbox_path' isolator.
The test is parameterized as the following three cases: 1. posix launcher + volume/sandbox_path (symlink). 2. linux launcher + volume/sandbox_path (symlink). 3. linux launcher + filesystem/linux + volume/sandbox_path (bind mount). Review: https://reviews.apache.org/r/57186/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a9bfe19 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a9bfe19 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a9bfe19 Branch: refs/heads/master Commit: 0a9bfe193bdfd5478fca96855f68d500af005fb1 Parents: 93ea4f6 Author: Gilbert Song <[email protected]> Authored: Mon Mar 6 15:04:47 2017 -0800 Committer: Vinod Kone <[email protected]> Committed: Mon Mar 6 15:04:47 2017 -0800 ---------------------------------------------------------------------- src/tests/default_executor_tests.cpp | 169 ++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0a9bfe19/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index f0ce2c5..e4d43c8 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -1420,6 +1420,175 @@ TEST_P(DefaultExecutorTest, ReservedResources) ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id()); } + +struct LauncherAndIsolationParam +{ + LauncherAndIsolationParam(const string& _launcher, const string& _isolation) + : launcher(_launcher), isolation(_isolation) {} + + const string launcher; + const string isolation; +}; + + +class PersistentVolumeDefaultExecutor + : public MesosTest, + public ::testing::WithParamInterface<LauncherAndIsolationParam> +{ +public: + PersistentVolumeDefaultExecutor() : param(GetParam()) {} + +protected: + LauncherAndIsolationParam param; +}; + + +INSTANTIATE_TEST_CASE_P( + LauncherAndIsolationParam, + PersistentVolumeDefaultExecutor, + ::testing::Values( + LauncherAndIsolationParam("posix", "volume/sandbox_path"), + LauncherAndIsolationParam("linux", "volume/sandbox_path"), + LauncherAndIsolationParam( + "linux", + "filesystem/linux,volume/sandbox_path"))); + + +// This test verifies that the default executor can be launched using +// reserved persistent resources which can be accessed by its tasks. +TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) +{ + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role(DEFAULT_TEST_ROLE); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::Resources unreserved = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::Resources reserved = unreserved.flatten( + frameworkInfo.role(), + v1::createReservationInfo(frameworkInfo.principal())).get(); + + v1::Resources volume = v1::createPersistentVolume( + Megabytes(1), + frameworkInfo.role(), + "id1", + "executor_volume_path", + frameworkInfo.principal(), + None(), + frameworkInfo.principal()); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID.value(), + None(), + None(), + v1::ExecutorInfo::DEFAULT); + + v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get(); + executorInfo.mutable_resources()->CopyFrom(executorResources); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + flags.launcher = param.launcher; + flags.isolation = param.isolation; + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), + FutureSatisfy(&connected))); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + const v1::Offer& offer = offers->offers(0); + + // Launch a task that accesses executor's volume. + v1::TaskInfo taskInfo = v1::createTask( + offer.agent_id(), + unreserved, + "test -d task_volume_path"); + + // TODO(gilbert): Refactor the following code once the helper + // to create a 'sandbox_path' volume is suppported. + mesos::v1::ContainerInfo* containerInfo = taskInfo.mutable_container(); + containerInfo->set_type(mesos::v1::ContainerInfo::MESOS); + + mesos::v1::Volume* taskVolume = containerInfo->add_volumes(); + taskVolume->set_mode(mesos::v1::Volume::RW); + taskVolume->set_container_path("task_volume_path"); + + mesos::v1::Volume::Source* source = taskVolume->mutable_source(); + source->set_type(mesos::v1::Volume::Source::SANDBOX_PATH); + + mesos::v1::Volume::Source::SandboxPath* sandboxPath = + source->mutable_sandbox_path(); + + sandboxPath->set_type(mesos::v1::Volume::Source::SandboxPath::PARENT); + sandboxPath->set_path("executor_volume_path"); + + Future<Event::Update> updateRunning; + Future<Event::Update> updateFinished; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(DoAll(FutureArg<1>(&updateRunning), + v1::scheduler::SendAcknowledge( + frameworkId, + offer.agent_id()))) + .WillOnce(FutureArg<1>(&updateFinished)); + + v1::Offer::Operation reserve = v1::RESERVE(reserved); + v1::Offer::Operation create = v1::CREATE(volume); + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo})); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {reserve, create, launchGroup})); + + AWAIT_READY(updateRunning); + ASSERT_EQ(TASK_RUNNING, updateRunning->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); + + AWAIT_READY(updateFinished); + ASSERT_EQ(TASK_FINISHED, updateFinished->status().state()); + ASSERT_EQ(taskInfo.task_id(), updateFinished->status().task_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
