Added Message call support to the master and the C++ scheduler library. Review: https://reviews.apache.org/r/35858
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f5878970 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f5878970 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f5878970 Branch: refs/heads/master Commit: f5878970a58bed8222a27a2c79aae9b281b01291 Parents: abb5adc Author: Vinod Kone <[email protected]> Authored: Wed Jun 24 17:14:41 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:59 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 39 ++++++++++---- src/master/master.hpp | 4 ++ src/scheduler/scheduler.cpp | 10 ++-- src/tests/scheduler_tests.cpp | 106 +++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a72b648..db59831 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1708,7 +1708,11 @@ void Master::receive( } case scheduler::Call::MESSAGE: { - drop(from, call, "Unimplemented"); + if (!call.has_message()) { + drop(from, call, "Expecting 'message' to be present"); + return; + } + message(framework, call.message()); break; } @@ -3159,11 +3163,26 @@ void Master::schedulerMessage( return; } - Slave* slave = slaves.registered.get(slaveId); + scheduler::Call::Message message_; + message_.mutable_slave_id()->CopyFrom(slaveId); + message_.mutable_executor_id()->CopyFrom(executorId); + message_.set_data(data); + + message(framework, message_); +} + + +void Master::message( + Framework* framework, + const scheduler::Call::Message& message) +{ + CHECK_NOTNULL(framework); + + Slave* slave = slaves.registered.get(message.slave_id()); if (slave == NULL) { LOG(WARNING) << "Cannot send framework message for framework " - << *framework << " to slave " << slaveId + << *framework << " to slave " << message.slave_id() << " because slave is not registered"; metrics->invalid_framework_to_executor_messages++; return; @@ -3177,15 +3196,15 @@ void Master::schedulerMessage( return; } - LOG(INFO) << "Sending framework message for framework " + LOG(INFO) << "Processing MESSAGE call from framework " << *framework << " to slave " << *slave; - FrameworkToExecutorMessage message; - message.mutable_slave_id()->MergeFrom(slaveId); - message.mutable_framework_id()->MergeFrom(frameworkId); - message.mutable_executor_id()->MergeFrom(executorId); - message.set_data(data); - send(slave->pid, message); + FrameworkToExecutorMessage message_; + message_.mutable_slave_id()->MergeFrom(message.slave_id()); + message_.mutable_framework_id()->MergeFrom(framework->id()); + message_.mutable_executor_id()->MergeFrom(message.executor_id()); + message_.set_data(message.data()); + send(slave->pid, message_); metrics->valid_framework_to_executor_messages++; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 6e1772b..5561396 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1037,6 +1037,10 @@ private: Framework* framework, const scheduler::Call::Reconcile& reconcile); + void message( + Framework* framework, + const scheduler::Call::Message& message); + bool elected() const { return leader.isSome() && leader.get() == info_; http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 478ef45..34fa78e 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -311,12 +311,10 @@ public: drop(call, "Expecting 'message' to be present"); return; } - FrameworkToExecutorMessage message; - message.mutable_slave_id()->CopyFrom(call.message().slave_id()); - message.mutable_framework_id()->CopyFrom(call.framework_id()); - message.mutable_executor_id()->CopyFrom(call.message().executor_id()); - message.set_data(call.message().data()); - send(master.get(), message); + // TODO(vinod): Add support for sending the call directly to + // the slave, instead of relaying it through the master, as + // the scheduler driver does. + send(master.get(), call); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 5e2b906..946fa82 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -835,6 +835,112 @@ TEST_F(SchedulerTest, Revive) } +TEST_F(SchedulerTest, Message) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + TestContainerizer containerizer(&exec); + + Try<PID<Slave>> slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::Mesos mesos( + master.get(), + DEFAULT_CREDENTIAL, + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + FrameworkID id(event.get().subscribed().framework_id()); + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_NE(0, event.get().offers().offers().size()); + + EXPECT_CALL(exec, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Offer offer = event.get().offers().offers(0); + TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); + + mesos.send(call); + } + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::UPDATE, event.get().type()); + EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + + Future<string> data; + EXPECT_CALL(exec, frameworkMessage(_, _)) + .WillOnce(FutureArg<1>(&data)); + + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::MESSAGE); + + Call::Message* message = call.mutable_message(); + message->mutable_slave_id()->CopyFrom(offer.slave_id()); + message->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + message->set_data("hello world"); + + mesos.send(call); + } + + AWAIT_ASSERT_EQ("hello world", data); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // TODO(benh): Write test for sending Call::Acknowledgement through // master to slave when Event::Update was generated locally.
