Added a test to verify metrics when shared resources are present. Review: https://reviews.apache.org/r/57964/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5f595f7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5f595f7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5f595f7 Branch: refs/heads/master Commit: b5f595f7e7f0941fd0ddc5edb37b2c06062de253 Parents: 25e3b14 Author: Anindya Sinha <[email protected]> Authored: Wed May 3 01:52:33 2017 -0700 Committer: Jiang Yan Xu <[email protected]> Committed: Wed May 3 02:02:05 2017 -0700 ---------------------------------------------------------------------- src/tests/persistent_volume_tests.cpp | 180 +++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b5f595f7/src/tests/persistent_volume_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp index d6a48af..984f1b7 100644 --- a/src/tests/persistent_volume_tests.cpp +++ b/src/tests/persistent_volume_tests.cpp @@ -1189,6 +1189,186 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy) // This test verifies that multiple frameworks belonging to the same role // can use the same shared persistent volume to launch tasks simultaneously. +// It also verifies that metrics for used resources are correctly populated +// on the master and the agent. +TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // 1. Create framework1 so that all resources are offered to this framework. + FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo1.set_role(DEFAULT_TEST_ROLE); + frameworkInfo1.add_capabilities()->set_type( + FrameworkInfo::Capability::SHARED_RESOURCES); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched1, registered(&driver1, _, _)); + + Future<vector<Offer>> offers1; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver1.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers1); + EXPECT_FALSE(offers1->empty()); + + Offer offer1 = offers1.get()[0]; + + // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset + // of resources from the offer. + Resource volume = createPersistentVolume( + getDiskResource(Megabytes(2048)), + "id1", + "path1", + None(), + frameworkInfo1.principal(), + true); // Shared volume. + + // Create a task which uses a portion of the offered resources, so that + // the remaining resources can be offered to framework2. + TaskInfo task1 = createTask( + offer1.slave_id(), + Resources::parse("cpus:1;mem:128").get() + volume, + "echo abc > path1/file1 && sleep 1000"); + + // We should receive a TASK_RUNNING for the launched task. + Future<TaskStatus> status1; + + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&status1)); + + // We use a filter of 0 seconds so the resources will be available + // in the next allocation cycle. + Filters filters; + filters.set_refuse_seconds(0); + + driver1.acceptOffers( + {offer1.id()}, + {CREATE(volume), + LAUNCH({task1})}, + filters); + + AWAIT_READY(status1); + EXPECT_EQ(TASK_RUNNING, status1->state()); + + // Collect metrics based on framework1. + JSON::Object stats1 = Metrics(); + ASSERT_EQ(1u, stats1.values.count("master/cpus_used")); + ASSERT_EQ(1u, stats1.values.count("master/mem_used")); + ASSERT_EQ(1u, stats1.values.count("master/disk_used")); + ASSERT_EQ(1u, stats1.values.count("master/disk_revocable_used")); + EXPECT_EQ(1, stats1.values["master/cpus_used"]); + EXPECT_EQ(128, stats1.values["master/mem_used"]); + EXPECT_EQ(2048, stats1.values["master/disk_used"]); + EXPECT_EQ(0, stats1.values["master/disk_revocable_used"]); + ASSERT_EQ(1u, stats1.values.count("slave/cpus_used")); + ASSERT_EQ(1u, stats1.values.count("slave/mem_used")); + ASSERT_EQ(1u, stats1.values.count("slave/disk_used")); + ASSERT_EQ(1u, stats1.values.count("slave/disk_revocable_used")); + EXPECT_EQ(2048, stats1.values["slave/disk_used"]); + EXPECT_EQ(0, stats1.values["slave/disk_revocable_used"]); + + // 3. Create framework2 of the same role. It would be offered resources + // recovered from the framework1 call. + FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo2.set_role(DEFAULT_TEST_ROLE); + frameworkInfo2.add_capabilities()->set_type( + FrameworkInfo::Capability::SHARED_RESOURCES); + + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched2, registered(&driver2, _, _)); + + Future<vector<Offer>> offers2; + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver2.start(); + + AWAIT_READY(offers2); + EXPECT_FALSE(offers2->empty()); + + Offer offer2 = offers2.get()[0]; + + EXPECT_TRUE(Resources(offer2.resources()).contains( + allocatedResources(volume, frameworkInfo2.role()))); + + // 4. framework2 LAUNCHes a task with a subset of resources from the offer. + + // Create a task `task2` which uses the same shared volume as `task1`. + TaskInfo task2 = createTask( + offer2.slave_id(), + Resources::parse("cpus:1;mem:256").get() + volume, + "echo abc > path1/file2 && sleep 1000"); + + // We should receive a TASK_RUNNING for the launched task. + Future<TaskStatus> status2; + + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&status2)); + + driver2.acceptOffers( + {offer2.id()}, + {LAUNCH({task2})}, + filters); + + AWAIT_READY(status2); + EXPECT_EQ(TASK_RUNNING, status2->state()); + + // Collect metrics based on both frameworks. Note that the `cpus_used` and + // `mem_used` is updated, but `disk_used` does not change since both tasks + // use the same shared volume. + JSON::Object stats2 = Metrics(); + ASSERT_EQ(1u, stats2.values.count("master/cpus_used")); + ASSERT_EQ(1u, stats2.values.count("master/mem_used")); + ASSERT_EQ(1u, stats2.values.count("master/disk_used")); + ASSERT_EQ(1u, stats2.values.count("master/disk_revocable_used")); + EXPECT_EQ(2, stats2.values["master/cpus_used"]); + EXPECT_EQ(384, stats2.values["master/mem_used"]); + EXPECT_EQ(2048, stats2.values["master/disk_used"]); + EXPECT_EQ(0, stats2.values["master/disk_revocable_used"]); + ASSERT_EQ(1u, stats2.values.count("slave/cpus_used")); + ASSERT_EQ(1u, stats2.values.count("slave/mem_used")); + ASSERT_EQ(1u, stats2.values.count("slave/disk_used")); + ASSERT_EQ(1u, stats2.values.count("slave/disk_revocable_used")); + EXPECT_EQ(2048, stats2.values["slave/disk_used"]); + EXPECT_EQ(0, stats2.values["slave/disk_revocable_used"]); + + // Resume the clock so the terminating task and executor can be reaped. + Clock::resume(); + + driver1.stop(); + driver1.join(); + + driver2.stop(); + driver2.join(); +} + + +// This test verifies that multiple frameworks belonging to the same role +// can use the same shared persistent volume to launch tasks simultaneously. // It also verifies that metrics for used resources are populated on the // master and the agent. TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
