Implemented the UPDATE Event handler in the scheduler driver.

Review: https://reviews.apache.org/r/36499


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01f6f7b3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01f6f7b3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01f6f7b3

Branch: refs/heads/master
Commit: 01f6f7b318605a96d42acddf71f1693b3a532483
Parents: 1317938
Author: Benjamin Mahler <[email protected]>
Authored: Fri Jul 10 15:13:14 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Thu Jul 16 16:11:44 2015 -0700

----------------------------------------------------------------------
 include/mesos/type_utils.hpp             |  3 ++
 src/common/type_utils.cpp                |  1 +
 src/sched/sched.cpp                      | 32 ++++++++++--
 src/tests/scheduler_event_call_tests.cpp | 72 +++++++++++++++++++++++++++
 4 files changed, 105 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index e7bfe8c..eb7fe25 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -61,6 +61,9 @@ bool operator == (const SlaveInfo& left, const SlaveInfo& 
right);
 bool operator == (const Volume& left, const Volume& right);
 
 
+bool operator == (const TaskStatus& left, const TaskStatus& right);
+bool operator != (const TaskStatus& left, const TaskStatus& right);
+
 inline bool operator == (const ContainerID& left, const ContainerID& right)
 {
   return left.value() == right.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 19f79b4..2ad5b4c 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -334,6 +334,7 @@ bool operator == (const SlaveInfo& left, const SlaveInfo& 
right)
 }
 
 
+// TODO(bmahler): Use SerializeToString here?
 bool operator == (const TaskStatus& left, const TaskStatus& right)
 {
   return left.task_id() == right.task_id() &&

http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index fa005c6..839fcbf 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -478,7 +478,33 @@ protected:
           break;
         }
 
-        drop(event, "Unimplemented");
+        const TaskStatus& status = event.update().status();
+
+        // Create a StatusUpdate based on the TaskStatus.
+        StatusUpdate update;
+        update.mutable_framework_id()->CopyFrom(framework.id());
+        update.mutable_status()->CopyFrom(status);
+        update.set_timestamp(status.timestamp());
+
+        if (status.has_executor_id()) {
+          update.mutable_executor_id()->CopyFrom(status.executor_id());
+        }
+
+        if (status.has_slave_id()) {
+          update.mutable_slave_id()->CopyFrom(status.slave_id());
+        }
+
+        if (status.has_uuid()) {
+          update.set_uuid(status.uuid());
+        }
+
+        // Note that we do not need to set the 'pid' now that
+        // the driver uses 'uuid' absence to skip acknowledgement.
+        //
+        // TODO(bmahler): Implement an 'update' method to match
+        // the Event naming scheme, and have 'statusUpdate' call
+        // into it.
+        statusUpdate(from, update, UPID());
         break;
       }
 
@@ -848,8 +874,8 @@ protected:
       }
 
       // See above for when we don't need to acknowledge.
-      if (update.has_uuid() && update.uuid() != "" &&
-          from != UPID() && pid != UPID()) {
+      if ((update.has_uuid() && update.uuid() != "") ||
+          (from != UPID() && pid != UPID())) {
         // We drop updates while we're disconnected.
         CHECK(connected);
         CHECK_SOME(master);

http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp 
b/src/tests/scheduler_event_call_tests.cpp
index 7bb4df2..cf6aa19 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -90,6 +90,78 @@ TEST_F(SchedulerDriverEventTest, Rescind)
 }
 
 
+// Ensures the scheduler driver can handle the UPDATE event.
+TEST_F(SchedulerDriverEventTest, Update)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  FrameworkRegisteredMessage message;
+  ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body));
+
+  FrameworkID frameworkId = message.framework_id();
+
+  SlaveID slaveId;
+  slaveId.set_value("S");
+
+  TaskID taskId;
+  taskId.set_value("T");
+
+  ExecutorID executorId;
+  executorId.set_value("E");
+
+  // Generate an update that needs no acknowledgement.
+  Event event;
+  event.set_type(Event::UPDATE);
+  event.mutable_update()->mutable_status()->CopyFrom(
+      protobuf::createStatusUpdate(
+          frameworkId,
+          slaveId,
+          taskId,
+          TASK_RUNNING,
+          TaskStatus::SOURCE_MASTER,
+          None(),
+          "message",
+          None(),
+          executorId).status());
+
+  Future<Nothing> statusUpdate;
+  Future<Nothing> statusUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, event.update().status()))
+    .WillOnce(FutureSatisfy(&statusUpdate))
+    .WillOnce(FutureSatisfy(&statusUpdate2));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(statusUpdate);
+
+  // Generate an update that requires acknowledgement.
+  event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes());
+
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(statusUpdate2);
+  AWAIT_READY(statusUpdateAcknowledgement);
+}
+
+
 // Ensures that the driver can handle the MESSAGE event.
 TEST_F(SchedulerDriverEventTest, Message)
 {

Reply via email to