Repository: mesos
Updated Branches:
  refs/heads/master a9312c237 -> 50696fa2f


Added tests for unset framework pid in the slave.

Since we do not yet have HTTP schedulers, this adds tests
that spoof empty pids coming from the master.

Review: https://reviews.apache.org/r/36761


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50696fa2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50696fa2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50696fa2

Branch: refs/heads/master
Commit: 50696fa2fa04bde042d992c8f9f2bd6020b79f4e
Parents: 9172a5f
Author: Benjamin Mahler <[email protected]>
Authored: Thu Jul 23 18:07:31 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp       |   8 +-
 src/tests/slave_tests.cpp | 275 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 281 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 1759d7e..7538c96 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1303,7 +1303,6 @@ const ::testing::Matcher<const std::vector<Offer>& > 
OfferEq(int cpus, int mem)
 }
 
 
-// Definition of the SendStatusUpdateFromTask action to be used with gmock.
 ACTION_P(SendStatusUpdateFromTask, state)
 {
   TaskStatus status;
@@ -1313,7 +1312,6 @@ ACTION_P(SendStatusUpdateFromTask, state)
 }
 
 
-// Definition of the SendStatusUpdateFromTaskID action to be used with gmock.
 ACTION_P(SendStatusUpdateFromTaskID, state)
 {
   TaskStatus status;
@@ -1323,6 +1321,12 @@ ACTION_P(SendStatusUpdateFromTaskID, state)
 }
 
 
+ACTION_P(SendFrameworkMessage, data)
+{
+  arg0->sendFrameworkMessage(data);
+}
+
+
 #define FUTURE_PROTOBUF(message, from, to)              \
   FutureProtobuf(message, from, to)
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 64cef6e..e086817 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2386,6 +2386,281 @@ TEST_F(SlaveTest, CheckpointedResourcesIncludedInUsage)
   Shutdown();
 }
 
+
+// Ensures that the slave correctly handles a framework without
+// a pid, which will be the case for HTTP schedulers. In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPScheduler)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Capture the run task message to unset the framework pid.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+  driver.start();
+
+  AWAIT_READY(runTaskMessage);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendFrameworkMessage("message"));
+
+  // The slave should forward the message through the master.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  // The master should then forward the message to the framework.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  // Clear the pid in the run task message so that the slave
+  // thinks this is an HTTP scheduler.
+  RunTaskMessage spoofed = runTaskMessage.get();
+  spoofed.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofed);
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+
+  AWAIT_READY(frameworkMessage);
+
+  // Must call shutdown before the mock executor gets deallocated.
+  Shutdown();
+}
+
+
+// Ensures that the slave correctly handles a framework upgrading
+// to HTTP (going from having a pid, to not having a pid). In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  Future<Nothing> launchTask;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureSatisfy(&launchTask));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(launchTask);
+
+  // Now spoof a live upgrade of the framework by updating
+  // the framework information to have an empty pid.
+  UpdateFrameworkMessage updateFrameworkMessage;
+  updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get());
+  updateFrameworkMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), updateFrameworkMessage);
+
+  // Send a message from the executor; the slave should forward
+  // the message through the master.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  execDriver->sendFrameworkMessage("message");
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+
+  AWAIT_READY(frameworkMessage);
+
+  // Must call shutdown before the mock executor gets deallocated.
+  Shutdown();
+}
+
+
+// Ensures that the slave can restart when there is an empty
+// framework pid. Executor messages should go through the
+// master (instead of directly to the scheduler!).
+TEST_F(SlaveTest, HTTPSchedulerSlaveRestart)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<slave::MesosContainerizer*> containerizer =
+    slave::MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer);
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  // Capture the executor information.
+  Future<Message> registerExecutorMessage =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  SlaveID slaveId = offers.get()[0].slave_id();
+
+  // Capture the run task so that we can unset the framework pid.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(runTaskMessage);
+
+  // Clear the pid in the run task message so that the slave
+  // thinks this is an HTTP scheduler.
+  RunTaskMessage spoofedRunTaskMessage = runTaskMessage.get();
+  spoofedRunTaskMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofedRunTaskMessage);
+
+  AWAIT_READY(registerExecutorMessage);
+
+  RegisterExecutorMessage registerExecutor;
+  registerExecutor.ParseFromString(registerExecutorMessage.get().body);
+  ExecutorID executorId = registerExecutor.executor_id();
+  UPID executorPid = registerExecutorMessage.get().from;
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Restart the slave.
+  Stop(slave.get());
+
+  Try<slave::MesosContainerizer*> containerizer2 =
+    slave::MesosContainerizer::create(flags, true, &fetcher);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Capture this so that we can unset the framework pid.
+  Future<UpdateFrameworkMessage> updateFrameworkMessage =
+     DROP_PROTOBUF(UpdateFrameworkMessage(), _, _);
+
+  slave = StartSlave(containerizer2.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveReregisteredMessage);
+  AWAIT_READY(updateFrameworkMessage);
+
+  // Make sure the slave sees an empty framework pid after recovery.
+  UpdateFrameworkMessage spoofedUpdateFrameworkMessage =
+    updateFrameworkMessage.get();
+  spoofedUpdateFrameworkMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofedUpdateFrameworkMessage);
+
+  // Spoof a message from the executor, to ensure the slave
+  // sends it through the master (instead of directly to the
+  // scheduler driver!).
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  ExecutorToFrameworkMessage executorToFrameworkMessage;
+  executorToFrameworkMessage.mutable_slave_id()->CopyFrom(slaveId);
+  executorToFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId);
+  executorToFrameworkMessage.mutable_executor_id()->CopyFrom(executorId);
+  executorToFrameworkMessage.set_data("message");
+
+  process::post(executorPid, slave.get(), executorToFrameworkMessage);
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+  AWAIT_READY(frameworkMessage);
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+
+  delete containerizer.get();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to