Added Message call support to the master and the C++ scheduler library.

Review: https://reviews.apache.org/r/35858


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f5878970
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f5878970
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f5878970

Branch: refs/heads/master
Commit: f5878970a58bed8222a27a2c79aae9b281b01291
Parents: abb5adc
Author: Vinod Kone <[email protected]>
Authored: Wed Jun 24 17:14:41 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Wed Jul 1 17:54:59 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp         |  39 ++++++++++----
 src/master/master.hpp         |   4 ++
 src/scheduler/scheduler.cpp   |  10 ++--
 src/tests/scheduler_tests.cpp | 106 +++++++++++++++++++++++++++++++++++++
 4 files changed, 143 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a72b648..db59831 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1708,7 +1708,11 @@ void Master::receive(
     }
 
     case scheduler::Call::MESSAGE: {
-      drop(from, call, "Unimplemented");
+      if (!call.has_message()) {
+        drop(from, call, "Expecting 'message' to be present");
+        return;
+      }
+      message(framework, call.message());
       break;
     }
 
@@ -3159,11 +3163,26 @@ void Master::schedulerMessage(
     return;
   }
 
-  Slave* slave = slaves.registered.get(slaveId);
+  scheduler::Call::Message message_;
+  message_.mutable_slave_id()->CopyFrom(slaveId);
+  message_.mutable_executor_id()->CopyFrom(executorId);
+  message_.set_data(data);
+
+  message(framework, message_);
+}
+
+
+void Master::message(
+    Framework* framework,
+    const scheduler::Call::Message& message)
+{
+  CHECK_NOTNULL(framework);
+
+  Slave* slave = slaves.registered.get(message.slave_id());
 
   if (slave == NULL) {
     LOG(WARNING) << "Cannot send framework message for framework "
-                 << *framework << " to slave " << slaveId
+                 << *framework << " to slave " << message.slave_id()
                  << " because slave is not registered";
     metrics->invalid_framework_to_executor_messages++;
     return;
@@ -3177,15 +3196,15 @@ void Master::schedulerMessage(
     return;
   }
 
-  LOG(INFO) << "Sending framework message for framework "
+  LOG(INFO) << "Processing MESSAGE call from framework "
             << *framework << " to slave " << *slave;
 
-  FrameworkToExecutorMessage message;
-  message.mutable_slave_id()->MergeFrom(slaveId);
-  message.mutable_framework_id()->MergeFrom(frameworkId);
-  message.mutable_executor_id()->MergeFrom(executorId);
-  message.set_data(data);
-  send(slave->pid, message);
+  FrameworkToExecutorMessage message_;
+  message_.mutable_slave_id()->MergeFrom(message.slave_id());
+  message_.mutable_framework_id()->MergeFrom(framework->id());
+  message_.mutable_executor_id()->MergeFrom(message.executor_id());
+  message_.set_data(message.data());
+  send(slave->pid, message_);
 
   metrics->valid_framework_to_executor_messages++;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6e1772b..5561396 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1037,6 +1037,10 @@ private:
       Framework* framework,
       const scheduler::Call::Reconcile& reconcile);
 
+  void message(
+      Framework* framework,
+      const scheduler::Call::Message& message);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 478ef45..34fa78e 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -311,12 +311,10 @@ public:
           drop(call, "Expecting 'message' to be present");
           return;
         }
-        FrameworkToExecutorMessage message;
-        message.mutable_slave_id()->CopyFrom(call.message().slave_id());
-        message.mutable_framework_id()->CopyFrom(call.framework_id());
-        message.mutable_executor_id()->CopyFrom(call.message().executor_id());
-        message.set_data(call.message().data());
-        send(master.get(), message);
+        // TODO(vinod): Add support for sending the call directly to
+        // the slave, instead of relaying it through the master, as
+        // the scheduler driver does.
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f5878970/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 5e2b906..946fa82 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -835,6 +835,112 @@ TEST_F(SchedulerTest, Revive)
 }
 
 
+TEST_F(SchedulerTest, Message)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  FrameworkID id(event.get().subscribed().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  Future<string> data;
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(FutureArg<1>(&data));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::MESSAGE);
+
+    Call::Message* message = call.mutable_message();
+    message->mutable_slave_id()->CopyFrom(offer.slave_id());
+    message->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+    message->set_data("hello world");
+
+    mesos.send(call);
+  }
+
+  AWAIT_ASSERT_EQ("hello world", data);
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 

Reply via email to