Repository: mesos Updated Branches: refs/heads/master a9312c237 -> 50696fa2f
Added tests for unset framework pid in the slave. Since we do not yet have HTTP schedulers, this adds tests that spoof empty pids coming from the master. Review: https://reviews.apache.org/r/36761 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50696fa2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50696fa2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50696fa2 Branch: refs/heads/master Commit: 50696fa2fa04bde042d992c8f9f2bd6020b79f4e Parents: 9172a5f Author: Benjamin Mahler <[email protected]> Authored: Thu Jul 23 18:07:31 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 24 16:25:44 2015 -0700 ---------------------------------------------------------------------- src/tests/mesos.hpp | 8 +- src/tests/slave_tests.cpp | 275 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 1759d7e..7538c96 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1303,7 +1303,6 @@ const ::testing::Matcher<const std::vector<Offer>& > OfferEq(int cpus, int mem) } -// Definition of the SendStatusUpdateFromTask action to be used with gmock. ACTION_P(SendStatusUpdateFromTask, state) { TaskStatus status; @@ -1313,7 +1312,6 @@ ACTION_P(SendStatusUpdateFromTask, state) } -// Definition of the SendStatusUpdateFromTaskID action to be used with gmock. ACTION_P(SendStatusUpdateFromTaskID, state) { TaskStatus status; @@ -1323,6 +1321,12 @@ ACTION_P(SendStatusUpdateFromTaskID, state) } +ACTION_P(SendFrameworkMessage, data) +{ + arg0->sendFrameworkMessage(data); +} + + #define FUTURE_PROTOBUF(message, from, to) \ FutureProtobuf(message, from, to) http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 64cef6e..e086817 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -2386,6 +2386,281 @@ TEST_F(SlaveTest, CheckpointedResourcesIncludedInUsage) Shutdown(); } + +// Ensures that the slave correctly handles a framework without +// a pid, which will be the case for HTTP schedulers. In +// particular, executor messages should be routed through the +// master. +TEST_F(SlaveTest, HTTPScheduler) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Capture the run task message to unset the framework pid. + Future<RunTaskMessage> runTaskMessage = + DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get()); + + driver.start(); + + AWAIT_READY(runTaskMessage); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendFrameworkMessage("message")); + + // The slave should forward the message through the master. + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); + + // The master should then forward the message to the framework. + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); + + Future<Nothing> frameworkMessage; + EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) + .WillOnce(FutureSatisfy(&frameworkMessage)); + + // Clear the pid in the run task message so that the slave + // thinks this is an HTTP scheduler. + RunTaskMessage spoofed = runTaskMessage.get(); + spoofed.set_pid(""); + + process::post(master.get(), slave.get(), spoofed); + + AWAIT_READY(executorToFrameworkMessage1); + AWAIT_READY(executorToFrameworkMessage2); + + AWAIT_READY(frameworkMessage); + + // Must call shutdown before the mock executor gets deallocated. + Shutdown(); +} + + +// Ensures that the slave correctly handles a framework upgrading +// to HTTP (going from having a pid, to not having a pid). In +// particular, executor messages should be routed through the +// master. +TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + Future<Nothing> launchTask; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(FutureSatisfy(&launchTask)); + + driver.start(); + + AWAIT_READY(frameworkId); + AWAIT_READY(launchTask); + + // Now spoof a live upgrade of the framework by updating + // the framework information to have an empty pid. + UpdateFrameworkMessage updateFrameworkMessage; + updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get()); + updateFrameworkMessage.set_pid(""); + + process::post(master.get(), slave.get(), updateFrameworkMessage); + + // Send a message from the executor; the slave should forward + // the message through the master. + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); + + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); + + Future<Nothing> frameworkMessage; + EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) + .WillOnce(FutureSatisfy(&frameworkMessage)); + + execDriver->sendFrameworkMessage("message"); + + AWAIT_READY(executorToFrameworkMessage1); + AWAIT_READY(executorToFrameworkMessage2); + + AWAIT_READY(frameworkMessage); + + // Must call shutdown before the mock executor gets deallocated. + Shutdown(); +} + + +// Ensures that the slave can restart when there is an empty +// framework pid. Executor messages should go through the +// master (instead of directly to the scheduler!). +TEST_F(SlaveTest, HTTPSchedulerSlaveRestart) +{ + Try<PID<Master> > master = this->StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = this->CreateSlaveFlags(); + + Fetcher fetcher; + + Try<slave::MesosContainerizer*> containerizer = + slave::MesosContainerizer::create(flags, true, &fetcher); + + ASSERT_SOME(containerizer); + + Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); + ASSERT_SOME(slave); + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + FrameworkID frameworkId; + EXPECT_CALL(sched, registered(_, _, _)) + .WillOnce(SaveArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + // Capture the executor information. + Future<Message> registerExecutorMessage = + FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + SlaveID slaveId = offers.get()[0].slave_id(); + + // Capture the run task so that we can unset the framework pid. + Future<RunTaskMessage> runTaskMessage = + DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(runTaskMessage); + + // Clear the pid in the run task message so that the slave + // thinks this is an HTTP scheduler. + RunTaskMessage spoofedRunTaskMessage = runTaskMessage.get(); + spoofedRunTaskMessage.set_pid(""); + + process::post(master.get(), slave.get(), spoofedRunTaskMessage); + + AWAIT_READY(registerExecutorMessage); + + RegisterExecutorMessage registerExecutor; + registerExecutor.ParseFromString(registerExecutorMessage.get().body); + ExecutorID executorId = registerExecutor.executor_id(); + UPID executorPid = registerExecutorMessage.get().from; + + AWAIT_READY(status); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // Restart the slave. + Stop(slave.get()); + + Try<slave::MesosContainerizer*> containerizer2 = + slave::MesosContainerizer::create(flags, true, &fetcher); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Capture this so that we can unset the framework pid. + Future<UpdateFrameworkMessage> updateFrameworkMessage = + DROP_PROTOBUF(UpdateFrameworkMessage(), _, _); + + slave = StartSlave(containerizer2.get(), flags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveReregisteredMessage); + AWAIT_READY(updateFrameworkMessage); + + // Make sure the slave sees an empty framework pid after recovery. + UpdateFrameworkMessage spoofedUpdateFrameworkMessage = + updateFrameworkMessage.get(); + spoofedUpdateFrameworkMessage.set_pid(""); + + process::post(master.get(), slave.get(), spoofedUpdateFrameworkMessage); + + // Spoof a message from the executor, to ensure the slave + // sends it through the master (instead of directly to the + // scheduler driver!). + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); + + Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = + FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); + + Future<Nothing> frameworkMessage; + EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) + .WillOnce(FutureSatisfy(&frameworkMessage)); + + ExecutorToFrameworkMessage executorToFrameworkMessage; + executorToFrameworkMessage.mutable_slave_id()->CopyFrom(slaveId); + executorToFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId); + executorToFrameworkMessage.mutable_executor_id()->CopyFrom(executorId); + executorToFrameworkMessage.set_data("message"); + + process::post(executorPid, slave.get(), executorToFrameworkMessage); + + AWAIT_READY(executorToFrameworkMessage1); + AWAIT_READY(executorToFrameworkMessage2); + AWAIT_READY(frameworkMessage); + + driver.stop(); + driver.join(); + + this->Shutdown(); + + delete containerizer.get(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
