Repository: mesos
Updated Branches:
  refs/heads/master 6380d3adc -> acd656c4e


Fixed the bug in validating duplicated task ID and added a test.

Review: https://reviews.apache.org/r/27903


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/acd656c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/acd656c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/acd656c4

Branch: refs/heads/master
Commit: acd656c4eb658d8c6ab9737a27af6547038f2999
Parents: 6380d3a
Author: Jie Yu <[email protected]>
Authored: Tue Nov 11 19:18:48 2014 -0800
Committer: Jie Yu <[email protected]>
Committed: Fri Nov 14 22:41:44 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp               |  13 +-
 src/tests/resource_offers_tests.cpp | 764 +++++++++++++++++--------------
 2 files changed, 433 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/acd656c4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4b5d582..83c2f8a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2591,10 +2591,11 @@ void Master::_launchTasks(
     const TaskInfo& task = tasks[index++];
 
     // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
-    // for the task was called before we are here.
-    if (!framework->pendingTasks.contains(task.task_id())) {
-      continue;
-    }
+    // for the task was called before we are here. No need to launch
+    // the task if it's no longer pending. However, we still need to
+    // check the authorization result and do the validation so that we
+    // can send status update in case the task has duplicated ID.
+    bool pending = framework->pendingTasks.contains(task.task_id());
 
     // Remove from pending tasks.
     framework->pendingTasks.erase(task.task_id());
@@ -2656,7 +2657,9 @@ void Master::_launchTasks(
     }
 
     // Launch task.
-    usedResources += launchTask(task, framework, slave);
+    if (pending) {
+      usedResources += launchTask(task, framework, slave);
+    }
   }
 
   // All used resources should be allocatable, enforced by our validators.

http://git-wip-us.apache.org/repos/asf/mesos/blob/acd656c4/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp 
b/src/tests/resource_offers_tests.cpp
index 21cb5ad..43820b0 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -53,59 +53,15 @@ using testing::AtMost;
 using testing::Return;
 
 
-class ResourceOffersTest : public MesosTest {};
-
-
-TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  // Start 10 slaves.
-  for (int i = 0; i < 10; i++) {
-    slave::Flags flags = CreateSlaveFlags();
-
-    flags.resources = Option<std::string>("cpus:2;mem:1024");
-
-    Try<PID<Slave> > slave = StartSlave(flags);
-    ASSERT_SOME(slave);
-  }
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // All 10 slaves might not be in first offer.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-  EXPECT_GE(10u, offers.get().size());
-
-  Resources resources(offers.get()[0].resources());
-  EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value());
-  EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value());
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-}
+class TaskValidationTest : public MesosTest {};
 
 
-TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID)
+TEST_F(TaskValidationTest, TaskUsesInvalidFrameworkID)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -141,12 +97,12 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID)
 }
 
 
-TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo)
+TEST_F(TaskValidationTest, TaskUsesCommandInfoAndExecutorInfo)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -155,7 +111,7 @@ TEST_F(ResourceOffersTest, 
TaskUsesCommandInfoAndExecutorInfo)
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
-  Future<vector<Offer> > offers;
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
@@ -190,12 +146,12 @@ TEST_F(ResourceOffersTest, 
TaskUsesCommandInfoAndExecutorInfo)
 }
 
 
-TEST_F(ResourceOffersTest, TaskUsesNoResources)
+TEST_F(TaskValidationTest, TaskUsesNoResources)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -205,7 +161,7 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
-  Future<vector<Offer> > offers;
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
@@ -244,12 +200,12 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources)
 }
 
 
-TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
+TEST_F(TaskValidationTest, TaskUsesInvalidResources)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -259,7 +215,7 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
-  Future<vector<Offer> > offers;
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
@@ -303,12 +259,12 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
 }
 
 
-TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
+TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -318,7 +274,7 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
-  Future<vector<Offer> > offers;
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
@@ -365,212 +321,192 @@ TEST_F(ResourceOffersTest, 
TaskUsesMoreResourcesThanOffered)
 }
 
 
-TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops)
+// This test verifies that if two tasks are launched with the same
+// task ID, the second task will get rejected.
+TEST_F(TaskValidationTest, DuplicatedTaskID)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
   ASSERT_SOME(slave);
 
-  MockScheduler sched1;
-  MesosSchedulerDriver driver1(
-      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(sched1, registered(&driver1, _, _))
-    .Times(1);
+  EXPECT_CALL(sched, registered(&driver, _, _));
 
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
-    .WillOnce(FutureArg<1>(&offers));
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  driver1.start();
+  driver.start();
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
-  driver1.stop();
-  driver1.join();
-
-  MockScheduler sched2;
-  MesosSchedulerDriver driver2(
-      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched2, registered(&driver2, _, _))
-    .Times(1);
-
-  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  driver2.start();
-
-  AWAIT_READY(offers);
-
-  driver2.stop();
-  driver2.join();
-
-  Shutdown();
-}
-
-
-TEST_F(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  Try<PID<Slave> > slave = StartSlave();
-  ASSERT_SOME(slave);
-
-  MockScheduler sched1;
-  MesosSchedulerDriver driver1(
-      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  ExecutorInfo executor;
+  executor.mutable_executor_id()->set_value("default");
+  executor.mutable_command()->set_value("exit 1");
 
-  EXPECT_CALL(sched1, registered(&driver1, _, _))
-    .Times(1);
+  // Create two tasks with the same id.
+  TaskInfo task1;
+  task1.set_name("");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  
task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:32").get());
+  task1.mutable_executor()->MergeFrom(executor);
 
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
-    .WillOnce(FutureArg<1>(&offers));
+  TaskInfo task2;
+  task2.set_name("");
+  task2.mutable_task_id()->set_value("1");
+  task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  
task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:32").get());
+  task2.mutable_executor()->MergeFrom(executor);
 
-  driver1.start();
+  vector<TaskInfo> tasks;
+  tasks.push_back(task1);
+  tasks.push_back(task2);
 
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
+  EXPECT_CALL(exec, registered(_, _, _, _));
 
-  vector<TaskInfo> tasks; // Use nothing!
-  driver1.launchTasks(offers.get()[0].id(), tasks);
+  // Grab the first task but don't send a status update.
+  Future<TaskInfo> task;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureArg<1>(&task));
 
-  MockScheduler sched2;
-  MesosSchedulerDriver driver2(
-      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
 
-  EXPECT_CALL(sched2, registered(&driver2, _, _))
-    .Times(1);
+  driver.launchTasks(offers.get()[0].id(), tasks);
 
-  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(FutureArg<1>(&offers));
+  AWAIT_READY(task);
+  EXPECT_EQ(task1.task_id(), task.get().task_id());
 
-  driver2.start();
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
 
-  AWAIT_READY(offers);
+  EXPECT_TRUE(strings::startsWith(
+      status.get().message(), "Task has duplicate ID"));
 
-  // Stop first framework before second so no offers are sent.
-  driver1.stop();
-  driver1.join();
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
 
-  driver2.stop();
-  driver2.join();
+  driver.stop();
+  driver.join();
 
   Shutdown();
 }
 
 
-TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+// This test verifies that two tasks launched on the same slave with
+// the same executor id but different executor info are rejected.
+TEST_F(TaskValidationTest, ExecutorInfoDiffersOnSameSlave)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
   ASSERT_SOME(slave);
 
-  MockScheduler sched1;
-  MesosSchedulerDriver driver1(
-      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(sched1, registered(&driver1, _, _))
+  EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  driver1.start();
+  driver.start();
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  ExecutorInfo executor;
+  executor.mutable_executor_id()->set_value("default");
+  executor.mutable_command()->set_value("exit 1");
 
-  Resource* cpus = task.add_resources();
-  cpus->set_name("cpus");
-  cpus->set_type(Value::SCALAR);
-  cpus->mutable_scalar()->set_value(0);
+  TaskInfo task1;
+  task1.set_name("");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task1.mutable_resources()->MergeFrom(
+      Resources::parse("cpus:1;mem:512").get());
+  task1.mutable_executor()->MergeFrom(executor);
 
-  Resource* mem = task.add_resources();
-  mem->set_name("mem");
-  mem->set_type(Value::SCALAR);
-  mem->mutable_scalar()->set_value(Gigabytes(1).bytes());
+  executor.mutable_command()->set_value("exit 2");
+
+  TaskInfo task2;
+  task2.set_name("");
+  task2.mutable_task_id()->set_value("2");
+  task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task2.mutable_resources()->MergeFrom(
+      Resources::parse("cpus:1;mem:512").get());
+  task2.mutable_executor()->MergeFrom(executor);
 
   vector<TaskInfo> tasks;
-  tasks.push_back(task);
+  tasks.push_back(task1);
+  tasks.push_back(task2);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  // Grab the "good" task but don't send a status update.
+  Future<TaskInfo> task;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureArg<1>(&task));
 
   Future<TaskStatus> status;
-  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
 
-  driver1.launchTasks(offers.get()[0].id(), tasks);
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(task);
+  EXPECT_EQ(task1.task_id(), task.get().task_id());
 
   AWAIT_READY(status);
-  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(task2.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_ERROR, status.get().state());
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
-  EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
-
-  MockScheduler sched2;
-  MesosSchedulerDriver driver2(
-      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched2, registered(&driver2, _, _))
-    .Times(1);
-
-  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver2.start();
-
-  AWAIT_READY(offers);
+  EXPECT_TRUE(strings::contains(
+      status.get().message(), "Task has invalid ExecutorInfo"));
 
-  driver1.stop();
-  driver1.join();
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
 
-  driver2.stop();
-  driver2.join();
+  driver.stop();
+  driver.join();
 
   Shutdown();
 }
 
-// TODO(benh): Add tests for checking correct slave IDs.
-
-// TODO(benh): Add tests for checking executor resource usage.
-
-// TODO(benh): Add tests which launch multiple tasks and check for
-// unique task IDs and aggregate resource usage.
-
 
-TEST_F(ResourceOffersTest, Request)
+// This test verifies that two tasks each launched on a different
+// slave with same executor id but different executor info are
+// allowed.
+TEST_F(TaskValidationTest, ExecutorInfoDiffersOnDifferentSlaves)
 {
-  MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
-
-  EXPECT_CALL(allocator, initialize(_, _, _))
-    .Times(1);
-
-  Try<PID<Master> > master = StartMaster(&allocator);
-  ASSERT_SOME(master);
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
       &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(allocator, frameworkAdded(_, _, _))
-    .Times(1);
-
   Future<Nothing> registered;
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
@@ -579,27 +515,91 @@ TEST_F(ResourceOffersTest, Request)
 
   AWAIT_READY(registered);
 
-  vector<Request> sent;
-  Request request;
-  request.mutable_slave_id()->set_value("test");
-  sent.push_back(request);
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
 
-  Future<vector<Request> > received;
-  EXPECT_CALL(allocator, resourcesRequested(_, _))
-    .WillOnce(FutureArg<1>(&received));
+  // Start the first slave.
+  MockExecutor exec1(DEFAULT_EXECUTOR_ID);
 
-  driver.requestResources(sent);
+  Try<PID<Slave>> slave1 = StartSlave(&exec1);
+  ASSERT_SOME(slave1);
 
-  AWAIT_READY(received);
-  EXPECT_EQ(sent.size(), received.get().size());
-  EXPECT_NE(0u, received.get().size());
-  EXPECT_EQ(request.slave_id(), received.get()[0].slave_id());
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
 
-  EXPECT_CALL(allocator, frameworkDeactivated(_))
-    .Times(AtMost(1)); // Races with shutting down the cluster.
+  // Launch the first task with the default executor id.
+  ExecutorInfo executor1;
+  executor1 = DEFAULT_EXECUTOR_INFO;
+  executor1.mutable_command()->set_value("exit 1");
 
-  EXPECT_CALL(allocator, frameworkRemoved(_))
-    .Times(AtMost(1)); // Races with shutting down the cluster.
+  TaskInfo task1 = createTask(
+      offers1.get()[0], executor1.command().value(), executor1.executor_id());
+
+  vector<TaskInfo> tasks1;
+  tasks1.push_back(task1);
+
+  EXPECT_CALL(exec1, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec1, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1));
+
+  driver.launchTasks(offers1.get()[0].id(), tasks1);
+
+  AWAIT_READY(status1);
+  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Now start the second slave.
+  MockExecutor exec2(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave2 = StartSlave(&exec2);
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+
+  // Now launch the second task with the same executor id but
+  // a different executor command.
+  ExecutorInfo executor2;
+  executor2 = executor1;
+  executor2.mutable_command()->set_value("exit 2");
+
+  TaskInfo task2 = createTask(
+      offers2.get()[0], executor2.command().value(), executor2.executor_id());
+
+  vector<TaskInfo> tasks2;
+  tasks2.push_back(task2);
+
+  EXPECT_CALL(exec2, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec2, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver.launchTasks(offers2.get()[0].id(), tasks2);
+
+  AWAIT_READY(status2);
+  ASSERT_EQ(TASK_RUNNING, status2.get().state());
+
+  EXPECT_CALL(exec1, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec2, shutdown(_))
+    .Times(AtMost(1));
 
   driver.stop();
   driver.join();
@@ -608,19 +608,31 @@ TEST_F(ResourceOffersTest, Request)
 }
 
 
-class MultipleExecutorsTest : public MesosTest {};
+// TODO(benh): Add tests for checking correct slave IDs.
 
-// This test verifies that two tasks launched on the same slave with
-// the same executor id but different executor info are rejected.
-TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnSameSlave)
+// TODO(benh): Add tests for checking executor resource usage.
+
+// TODO(benh): Add tests which launch multiple tasks and check for
+// aggregate resource usage.
+
+
+class ResourceOffersTest : public MesosTest {};
+
+
+TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  // Start 10 slaves.
+  for (int i = 0; i < 10; i++) {
+    slave::Flags flags = CreateSlaveFlags();
 
-  Try<PID<Slave> > slave = StartSlave(&exec);
-  ASSERT_SOME(slave);
+    flags.resources = Option<std::string>("cpus:2;mem:1024");
+
+    Try<PID<Slave>> slave = StartSlave(flags);
+    ASSERT_SOME(slave);
+  }
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
@@ -629,182 +641,256 @@ TEST_F(MultipleExecutorsTest, 
ExecutorInfoDiffersOnSameSlave)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
-  Future<vector<Offer> > offers;
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillRepeatedly(Return()); // All 10 slaves might not be in first offer.
 
   driver.start();
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
+  EXPECT_GE(10u, offers.get().size());
 
-  ExecutorInfo executor;
-  executor.mutable_executor_id()->set_value("default");
-  executor.mutable_command()->set_value("exit 1");
+  Resources resources(offers.get()[0].resources());
+  EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value());
+  EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value());
 
-  TaskInfo task1;
-  task1.set_name("");
-  task1.mutable_task_id()->set_value("1");
-  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task1.mutable_resources()->MergeFrom(
-      Resources::parse("cpus:1;mem:512").get());
-  task1.mutable_executor()->MergeFrom(executor);
+  driver.stop();
+  driver.join();
 
-  executor.mutable_command()->set_value("exit 2");
+  Shutdown();
+}
 
-  TaskInfo task2;
-  task2.set_name("");
-  task2.mutable_task_id()->set_value("2");
-  task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task2.mutable_resources()->MergeFrom(
-      Resources::parse("cpus:1;mem:512").get());
-  task2.mutable_executor()->MergeFrom(executor);
 
-  vector<TaskInfo> tasks;
-  tasks.push_back(task1);
-  tasks.push_back(task2);
+TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
 
-  EXPECT_CALL(exec, registered(_, _, _, _))
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
     .Times(1);
 
-  // Grab the "good" task but don't send a status update.
-  Future<TaskInfo> task;
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(FutureArg<1>(&task));
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers));
 
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
+  driver1.start();
 
-  driver.launchTasks(offers.get()[0].id(), tasks);
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
 
-  AWAIT_READY(task);
-  EXPECT_EQ(task1.task_id(), task.get().task_id());
+  driver1.stop();
+  driver1.join();
 
-  AWAIT_READY(status);
-  EXPECT_EQ(task2.task_id(), status.get().task_id());
-  EXPECT_EQ(TASK_ERROR, status.get().state());
-  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
-  EXPECT_TRUE(status.get().has_message());
-  EXPECT_TRUE(strings::contains(
-      status.get().message(), "Task has invalid ExecutorInfo"));
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
 
-  driver.stop();
-  driver.join();
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver2.start();
+
+  AWAIT_READY(offers);
+
+  driver2.stop();
+  driver2.join();
 
   Shutdown();
 }
 
 
-// This test verifies that two tasks each launched on a different
-// slave with same executor id but different executor info are
-// allowed.
-TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnDifferentSlaves)
+TEST_F(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
 
-  Future<Nothing> registered;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered));
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  driver.start();
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .Times(1);
 
-  AWAIT_READY(registered);
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers));
 
-  Future<vector<Offer> > offers1;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers1));
+  driver1.start();
 
-  // Start the first slave.
-  MockExecutor exec1(DEFAULT_EXECUTOR_ID);
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
 
-  Try<PID<Slave> > slave1 = StartSlave(&exec1);
-  ASSERT_SOME(slave1);
+  vector<TaskInfo> tasks; // Use nothing!
+  driver1.launchTasks(offers.get()[0].id(), tasks);
 
-  AWAIT_READY(offers1);
-  EXPECT_NE(0u, offers1.get().size());
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  // Launch the first task with the default executor id.
-  ExecutorInfo executor1;
-  executor1 = DEFAULT_EXECUTOR_INFO;
-  executor1.mutable_command()->set_value("exit 1");
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
 
-  TaskInfo task1 = createTask(
-      offers1.get()[0], executor1.command().value(), executor1.executor_id());
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers));
 
-  vector<TaskInfo> tasks1;
-  tasks1.push_back(task1);
+  driver2.start();
 
-  EXPECT_CALL(exec1, registered(_, _, _, _))
+  AWAIT_READY(offers);
+
+  // Stop first framework before second so no offers are sent.
+  driver1.stop();
+  driver1.join();
+
+  driver2.stop();
+  driver2.join();
+
+  Shutdown();
+}
+
+
+TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
     .Times(1);
 
-  EXPECT_CALL(exec1, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  Future<TaskStatus> status1;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status1));
+  driver1.start();
 
-  driver.launchTasks(offers1.get()[0].id(), tasks1);
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
 
-  AWAIT_READY(status1);
-  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
-  Future<vector<Offer> > offers2;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers2))
+  Resource* cpus = task.add_resources();
+  cpus->set_name("cpus");
+  cpus->set_type(Value::SCALAR);
+  cpus->mutable_scalar()->set_value(0);
+
+  Resource* mem = task.add_resources();
+  mem->set_name("mem");
+  mem->set_type(Value::SCALAR);
+  mem->mutable_scalar()->set_value(Gigabytes(1).bytes());
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver1.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
+
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  // Now start the second slave.
-  MockExecutor exec2(DEFAULT_EXECUTOR_ID);
+  driver2.start();
 
-  Try<PID<Slave> > slave2 = StartSlave(&exec2);
-  ASSERT_SOME(slave2);
+  AWAIT_READY(offers);
 
-  AWAIT_READY(offers2);
-  EXPECT_NE(0u, offers2.get().size());
+  driver1.stop();
+  driver1.join();
 
-  // Now launch the second task with the same executor id but
-  // a different executor command.
-  ExecutorInfo executor2;
-  executor2 = executor1;
-  executor2.mutable_command()->set_value("exit 2");
+  driver2.stop();
+  driver2.join();
 
-  TaskInfo task2 = createTask(
-      offers2.get()[0], executor2.command().value(), executor2.executor_id());
+  Shutdown();
+}
 
-  vector<TaskInfo> tasks2;
-  tasks2.push_back(task2);
 
-  EXPECT_CALL(exec2, registered(_, _, _, _))
+TEST_F(ResourceOffersTest, Request)
+{
+  MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
+
+  EXPECT_CALL(allocator, initialize(_, _, _))
     .Times(1);
 
-  EXPECT_CALL(exec2, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+  Try<PID<Master>> master = StartMaster(&allocator);
+  ASSERT_SOME(master);
 
-  Future<TaskStatus> status2;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status2));
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  driver.launchTasks(offers2.get()[0].id(), tasks2);
+  EXPECT_CALL(allocator, frameworkAdded(_, _, _))
+    .Times(1);
 
-  AWAIT_READY(status2);
-  ASSERT_EQ(TASK_RUNNING, status2.get().state());
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
 
-  EXPECT_CALL(exec1, shutdown(_))
-    .Times(AtMost(1));
+  driver.start();
 
-  EXPECT_CALL(exec2, shutdown(_))
-    .Times(AtMost(1));
+  AWAIT_READY(registered);
+
+  vector<Request> sent;
+  Request request;
+  request.mutable_slave_id()->set_value("test");
+  sent.push_back(request);
+
+  Future<vector<Request>> received;
+  EXPECT_CALL(allocator, resourcesRequested(_, _))
+    .WillOnce(FutureArg<1>(&received));
+
+  driver.requestResources(sent);
+
+  AWAIT_READY(received);
+  EXPECT_EQ(sent.size(), received.get().size());
+  EXPECT_NE(0u, received.get().size());
+  EXPECT_EQ(request.slave_id(), received.get()[0].slave_id());
+
+  EXPECT_CALL(allocator, frameworkDeactivated(_))
+    .Times(AtMost(1)); // Races with shutting down the cluster.
+
+  EXPECT_CALL(allocator, frameworkRemoved(_))
+    .Times(AtMost(1)); // Races with shutting down the cluster.
 
   driver.stop();
   driver.join();

Reply via email to