Added duplicated persistence id check in ResourceChecker. Review: https://reviews.apache.org/r/28664
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22d1f608 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22d1f608 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22d1f608 Branch: refs/heads/master Commit: 22d1f608e842f0452ef243b49778ca82c2bfd7b0 Parents: 3f0f275 Author: Jie Yu <[email protected]> Authored: Wed Dec 3 11:25:41 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 3 16:13:44 2014 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 22 ++++++++++ src/tests/resource_offers_tests.cpp | 71 ++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index bf9d20f..3dc4e7a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1899,6 +1899,12 @@ struct ResourceChecker : TaskInfoVisitor const Resources& totalResources, const Resources& usedResources) { + // This is used to ensure no duplicated persistence id exists. + // TODO(jieyu): The check we have right now is a partial check for + // the current task. We need to add checks against slave's + // existing tasks and executors as well. + hashset<string> persistenceIds; + Option<Error> error = Resources::validate(task.resources()); if (error.isSome()) { return Error("Task uses invalid resources: " + error.get().message); @@ -1912,6 +1918,14 @@ struct ResourceChecker : TaskInfoVisitor if (error.isSome()) { return Error("Task uses invalid DiskInfo: " + error.get().message); } + + if (resource.disk().has_persistence()) { + string id = resource.disk().persistence().id(); + if (persistenceIds.contains(id)) { + return Error("Task uses duplicated persistence ID " + id); + } + persistenceIds.insert(id); + } } } @@ -1931,6 +1945,14 @@ struct ResourceChecker : TaskInfoVisitor return Error( "Executor uses invalid DiskInfo: " + error.get().message); } + + if (resource.disk().has_persistence()) { + string id = resource.disk().persistence().id(); + if (persistenceIds.contains(id)) { + return Error("Executor uses duplicated persistence ID " + id); + } + persistenceIds.insert(id); + } } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index 467c7e5..e13b6c5 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -926,6 +926,77 @@ TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume) Shutdown(); } + +TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + // Create two persistent disk resources with the same id. + Resource diskResource1 = Resources::parse("disk", "128", "role1").get(); + diskResource1.mutable_disk()->CopyFrom(createDiskInfo("1", "1")); + + Resource diskResource2 = Resources::parse("disk", "64", "role1").get(); + diskResource2.mutable_disk()->CopyFrom(createDiskInfo("1", "1")); + + // Include non-persistent disk resource in task resources. + Resources taskResources = + Resources::parse("cpus:1;mem:128").get() + diskResource1 + diskResource2; + + Offer offer = offers.get()[0]; + TaskInfo task = + createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offer.id(), tasks); + + AWAIT_READY(status); + EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(TASK_ERROR, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); + EXPECT_TRUE(status.get().has_message()); + EXPECT_TRUE(strings::contains( + status.get().message(), + "Task uses duplicated persistence ID 1")); + + driver.stop(); + driver.join(); + + Shutdown(); +} + +// TODO(jieyu): Add tests for checking duplicated persistence ID +// across task and executors. + +// TODO(jieyu): Add tests for checking duplicated persistence ID +// within an executor. + // TODO(benh): Add tests for checking correct slave IDs. // TODO(benh): Add tests for checking executor resource usage.
