Implemented the UPDATE Event handler in the scheduler driver. Review: https://reviews.apache.org/r/36499
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01f6f7b3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01f6f7b3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01f6f7b3 Branch: refs/heads/master Commit: 01f6f7b318605a96d42acddf71f1693b3a532483 Parents: 1317938 Author: Benjamin Mahler <[email protected]> Authored: Fri Jul 10 15:13:14 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Thu Jul 16 16:11:44 2015 -0700 ---------------------------------------------------------------------- include/mesos/type_utils.hpp | 3 ++ src/common/type_utils.cpp | 1 + src/sched/sched.cpp | 32 ++++++++++-- src/tests/scheduler_event_call_tests.cpp | 72 +++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/include/mesos/type_utils.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp index e7bfe8c..eb7fe25 100644 --- a/include/mesos/type_utils.hpp +++ b/include/mesos/type_utils.hpp @@ -61,6 +61,9 @@ bool operator == (const SlaveInfo& left, const SlaveInfo& right); bool operator == (const Volume& left, const Volume& right); +bool operator == (const TaskStatus& left, const TaskStatus& right); +bool operator != (const TaskStatus& left, const TaskStatus& right); + inline bool operator == (const ContainerID& left, const ContainerID& right) { return left.value() == right.value(); http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/common/type_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp index 19f79b4..2ad5b4c 100644 --- a/src/common/type_utils.cpp +++ b/src/common/type_utils.cpp @@ -334,6 +334,7 @@ bool operator == (const SlaveInfo& left, const SlaveInfo& right) } +// TODO(bmahler): Use SerializeToString here? bool operator == (const TaskStatus& left, const TaskStatus& right) { return left.task_id() == right.task_id() && http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index fa005c6..839fcbf 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -478,7 +478,33 @@ protected: break; } - drop(event, "Unimplemented"); + const TaskStatus& status = event.update().status(); + + // Create a StatusUpdate based on the TaskStatus. + StatusUpdate update; + update.mutable_framework_id()->CopyFrom(framework.id()); + update.mutable_status()->CopyFrom(status); + update.set_timestamp(status.timestamp()); + + if (status.has_executor_id()) { + update.mutable_executor_id()->CopyFrom(status.executor_id()); + } + + if (status.has_slave_id()) { + update.mutable_slave_id()->CopyFrom(status.slave_id()); + } + + if (status.has_uuid()) { + update.set_uuid(status.uuid()); + } + + // Note that we do not need to set the 'pid' now that + // the driver uses 'uuid' absence to skip acknowledgement. + // + // TODO(bmahler): Implement an 'update' method to match + // the Event naming scheme, and have 'statusUpdate' call + // into it. + statusUpdate(from, update, UPID()); break; } @@ -848,8 +874,8 @@ protected: } // See above for when we don't need to acknowledge. - if (update.has_uuid() && update.uuid() != "" && - from != UPID() && pid != UPID()) { + if ((update.has_uuid() && update.uuid() != "") || + (from != UPID() && pid != UPID())) { // We drop updates while we're disconnected. CHECK(connected); CHECK_SOME(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/01f6f7b3/src/tests/scheduler_event_call_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp index 7bb4df2..cf6aa19 100644 --- a/src/tests/scheduler_event_call_tests.cpp +++ b/src/tests/scheduler_event_call_tests.cpp @@ -90,6 +90,78 @@ TEST_F(SchedulerDriverEventTest, Rescind) } +// Ensures the scheduler driver can handle the UPDATE event. +TEST_F(SchedulerDriverEventTest, Update) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<Message> frameworkRegisteredMessage = + FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + FrameworkRegisteredMessage message; + ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body)); + + FrameworkID frameworkId = message.framework_id(); + + SlaveID slaveId; + slaveId.set_value("S"); + + TaskID taskId; + taskId.set_value("T"); + + ExecutorID executorId; + executorId.set_value("E"); + + // Generate an update that needs no acknowledgement. + Event event; + event.set_type(Event::UPDATE); + event.mutable_update()->mutable_status()->CopyFrom( + protobuf::createStatusUpdate( + frameworkId, + slaveId, + taskId, + TASK_RUNNING, + TaskStatus::SOURCE_MASTER, + None(), + "message", + None(), + executorId).status()); + + Future<Nothing> statusUpdate; + Future<Nothing> statusUpdate2; + EXPECT_CALL(sched, statusUpdate(&driver, event.update().status())) + .WillOnce(FutureSatisfy(&statusUpdate)) + .WillOnce(FutureSatisfy(&statusUpdate2)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(statusUpdate); + + // Generate an update that requires acknowledgement. + event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes()); + + Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement = + DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(statusUpdate2); + AWAIT_READY(statusUpdateAcknowledgement); +} + + // Ensures that the driver can handle the MESSAGE event. TEST_F(SchedulerDriverEventTest, Message) {
