Updated scheduler driver to send MESSAGE call. Review: https://reviews.apache.org/r/36469
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf485e29 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf485e29 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf485e29 Branch: refs/heads/master Commit: cf485e29413a4d24567f82f9a47c30e59bd2c522 Parents: 5717eab Author: Vinod Kone <[email protected]> Authored: Mon Jul 13 13:03:13 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Jul 17 10:44:00 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 20 +++++-- src/tests/fault_tolerance_tests.cpp | 95 +++++++++++++++++++++++++++++++- src/tests/slave_recovery_tests.cpp | 4 +- 3 files changed, 110 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 613e40b..c563c44 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -1259,6 +1259,8 @@ protected: UPID slave = savedSlavePids[slaveId]; CHECK(slave != UPID()); + // TODO(vinod): Send a Call directly to the slave once that + // support is added. FrameworkToExecutorMessage message; message.mutable_slave_id()->MergeFrom(slaveId); message.mutable_framework_id()->MergeFrom(framework.id()); @@ -1269,13 +1271,19 @@ protected: VLOG(1) << "Cannot send directly to slave " << slaveId << "; sending through master"; - FrameworkToExecutorMessage message; - message.mutable_slave_id()->MergeFrom(slaveId); - message.mutable_framework_id()->MergeFrom(framework.id()); - message.mutable_executor_id()->MergeFrom(executorId); - message.set_data(data); + Call call; + + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::MESSAGE); + + Call::Message* message = call.mutable_message(); + message->mutable_slave_id()->CopyFrom(slaveId); + message->mutable_executor_id()->CopyFrom(executorId); + message->set_data(data); + CHECK_SOME(master); - send(master.get(), message); + send(master.get(), call); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index f64c797..60ca523 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -18,6 +18,7 @@ #include <gmock/gmock.h> +#include <string> #include <vector> #include <mesos/executor.hpp> @@ -70,6 +71,7 @@ using process::UPID; using process::http::OK; using process::http::Response; +using std::string; using std::vector; using testing::_; @@ -1156,7 +1158,7 @@ TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor) } -TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage) +TEST_F(FaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage) { Try<PID<Master> > master = StartMaster(); ASSERT_SOME(master); @@ -1260,6 +1262,97 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage) } +TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave> > slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + FrameworkID frameworkId; + EXPECT_CALL(sched1, registered(&driver1, _, _)) + .WillOnce(SaveArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver1.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + Future<TaskStatus> status; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&status)); + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); + + driver1.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + MockScheduler sched2; + + FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. + framework2 = DEFAULT_FRAMEWORK_INFO; + framework2.mutable_id()->MergeFrom(frameworkId); + + MesosSchedulerDriver driver2( + &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered)); + + Future<Nothing> error; + EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) + .WillOnce(FutureSatisfy(&error)); + + driver2.start(); + + AWAIT_READY(error); + + AWAIT_READY(registered); + + Future<string> frameworkMessage; + EXPECT_CALL(exec, frameworkMessage(_, _)) + .WillOnce(FutureArg<1>(&frameworkMessage)); + + driver2.sendFrameworkMessage( + DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world"); + + AWAIT_EQ("hello world", frameworkMessage); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver1.stop(); + driver2.stop(); + + driver1.join(); + driver2.join(); + + Shutdown(); +} + + // This test verifies that a partitioned framework that still // thinks it is registered with the master cannot kill a task because // the master has re-registered another instance of the framework. http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 7708cf6..ff7aaf9 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -310,13 +310,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) .tasks[task.task_id()] .updates.front().uuid()); + const UUID& uuid = UUID::fromBytes(ack.get().acknowledge().uuid()); ASSERT_TRUE(state .frameworks[frameworkId] .executors[executorId] .runs[containerId.get()] .tasks[task.task_id()] - .acks.contains( - UUID::fromBytes(ack.get().acknowledge().uuid()))); + .acks.contains(uuid)); // Shut down the executor manually so that it doesn't hang around // after the test finishes.
