Added duplicated persistence id check in ResourceChecker.

Review: https://reviews.apache.org/r/28664


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22d1f608
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22d1f608
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22d1f608

Branch: refs/heads/master
Commit: 22d1f608e842f0452ef243b49778ca82c2bfd7b0
Parents: 3f0f275
Author: Jie Yu <[email protected]>
Authored: Wed Dec 3 11:25:41 2014 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Dec 3 16:13:44 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp               | 22 ++++++++++
 src/tests/resource_offers_tests.cpp | 71 ++++++++++++++++++++++++++++++++
 2 files changed, 93 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bf9d20f..3dc4e7a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1899,6 +1899,12 @@ struct ResourceChecker : TaskInfoVisitor
       const Resources& totalResources,
       const Resources& usedResources)
   {
+    // This is used to ensure no duplicated persistence id exists.
+    // TODO(jieyu): The check we have right now is a partial check for
+    // the current task. We need to add checks against slave's
+    // existing tasks and executors as well.
+    hashset<string> persistenceIds;
+
     Option<Error> error = Resources::validate(task.resources());
     if (error.isSome()) {
       return Error("Task uses invalid resources: " + error.get().message);
@@ -1912,6 +1918,14 @@ struct ResourceChecker : TaskInfoVisitor
         if (error.isSome()) {
           return Error("Task uses invalid DiskInfo: " + error.get().message);
         }
+
+        if (resource.disk().has_persistence()) {
+          string id = resource.disk().persistence().id();
+          if (persistenceIds.contains(id)) {
+            return Error("Task uses duplicated persistence ID " + id);
+          }
+          persistenceIds.insert(id);
+        }
       }
     }
 
@@ -1931,6 +1945,14 @@ struct ResourceChecker : TaskInfoVisitor
             return Error(
                 "Executor uses invalid DiskInfo: " + error.get().message);
           }
+
+          if (resource.disk().has_persistence()) {
+            string id = resource.disk().persistence().id();
+            if (persistenceIds.contains(id)) {
+              return Error("Executor uses duplicated persistence ID " + id);
+            }
+            persistenceIds.insert(id);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp 
b/src/tests/resource_offers_tests.cpp
index 467c7e5..e13b6c5 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -926,6 +926,77 @@ TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume)
   Shutdown();
 }
 
+
+TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create two persistent disk resources with the same id.
+  Resource diskResource1 = Resources::parse("disk", "128", "role1").get();
+  diskResource1.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  Resource diskResource2 = Resources::parse("disk", "64", "role1").get();
+  diskResource2.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  // Include non-persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource1 + diskResource2;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Task uses duplicated persistence ID 1"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// across task and executors.
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// within an executor.
+
 // TODO(benh): Add tests for checking correct slave IDs.
 
 // TODO(benh): Add tests for checking executor resource usage.

Reply via email to