Updated scheduler driver to send MESSAGE call.

Review: https://reviews.apache.org/r/36469


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf485e29
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf485e29
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf485e29

Branch: refs/heads/master
Commit: cf485e29413a4d24567f82f9a47c30e59bd2c522
Parents: 5717eab
Author: Vinod Kone <[email protected]>
Authored: Mon Jul 13 13:03:13 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                 | 20 +++++--
 src/tests/fault_tolerance_tests.cpp | 95 +++++++++++++++++++++++++++++++-
 src/tests/slave_recovery_tests.cpp  |  4 +-
 3 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 613e40b..c563c44 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1259,6 +1259,8 @@ protected:
       UPID slave = savedSlavePids[slaveId];
       CHECK(slave != UPID());
 
+      // TODO(vinod): Send a Call directly to the slave once that
+      // support is added.
       FrameworkToExecutorMessage message;
       message.mutable_slave_id()->MergeFrom(slaveId);
       message.mutable_framework_id()->MergeFrom(framework.id());
@@ -1269,13 +1271,19 @@ protected:
       VLOG(1) << "Cannot send directly to slave " << slaveId
               << "; sending through master";
 
-      FrameworkToExecutorMessage message;
-      message.mutable_slave_id()->MergeFrom(slaveId);
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      message.mutable_executor_id()->MergeFrom(executorId);
-      message.set_data(data);
+      Call call;
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::MESSAGE);
+
+      Call::Message* message = call.mutable_message();
+      message->mutable_slave_id()->CopyFrom(slaveId);
+      message->mutable_executor_id()->CopyFrom(executorId);
+      message->set_data(data);
+
       CHECK_SOME(master);
-      send(master.get(), message);
+      send(master.get(), call);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index f64c797..60ca523 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -18,6 +18,7 @@
 
 #include <gmock/gmock.h>
 
+#include <string>
 #include <vector>
 
 #include <mesos/executor.hpp>
@@ -70,6 +71,7 @@ using process::UPID;
 using process::http::OK;
 using process::http::Response;
 
+using std::string;
 using std::vector;
 
 using testing::_;
@@ -1156,7 +1158,7 @@ TEST_F(FaultToleranceTest, 
ForwardStatusUpdateUnknownExecutor)
 }
 
 
-TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
+TEST_F(FaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
@@ -1260,6 +1262,97 @@ TEST_F(FaultToleranceTest, 
SchedulerFailoverFrameworkMessage)
 }
 
 
+TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver1.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  driver1.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId);
+
+  MesosSchedulerDriver driver2(
+      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  Future<Nothing> error;
+  EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+    .WillOnce(FutureSatisfy(&error));
+
+  driver2.start();
+
+  AWAIT_READY(error);
+
+  AWAIT_READY(registered);
+
+  Future<string> frameworkMessage;
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(FutureArg<1>(&frameworkMessage));
+
+  driver2.sendFrameworkMessage(
+      DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world");
+
+  AWAIT_EQ("hello world", frameworkMessage);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  Shutdown();
+}
+
+
 // This test verifies that a partitioned framework that still
 // thinks it is registered with the master cannot kill a task because
 // the master has re-registered another instance of the framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 7708cf6..ff7aaf9 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -310,13 +310,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
         .tasks[task.task_id()]
         .updates.front().uuid());
 
+  const UUID& uuid = UUID::fromBytes(ack.get().acknowledge().uuid());
   ASSERT_TRUE(state
                 .frameworks[frameworkId]
                 .executors[executorId]
                 .runs[containerId.get()]
                 .tasks[task.task_id()]
-                .acks.contains(
-                    UUID::fromBytes(ack.get().acknowledge().uuid())));
+                .acks.contains(uuid));
 
   // Shut down the executor manually so that it doesn't hang around
   // after the test finishes.

Reply via email to