Repository: mesos Updated Branches: refs/heads/master a6a3dcaa5 -> cb941fefc
Added helpers for converting scheduler messages to Events. Review: https://reviews.apache.org/r/36803 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cb941fef Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cb941fef Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cb941fef Branch: refs/heads/master Commit: cb941fefcf73a557b8c0d25cc616ba75e88a6062 Parents: a6a3dca Author: Vinod Kone <[email protected]> Authored: Fri Jul 24 16:24:54 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Jul 24 17:10:57 2015 -0700 ---------------------------------------------------------------------- src/common/protobuf_utils.cpp | 155 ++++++++++++++++++++++++++++++++++++- src/common/protobuf_utils.hpp | 19 +++++ src/scheduler/scheduler.cpp | 121 ++++------------------------- 3 files changed, 186 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index d900707..90a2461 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -16,6 +16,8 @@ * limitations under the License. */ +#include <mesos/scheduler/scheduler.hpp> + #include <mesos/slave/isolator.hpp> #include <mesos/type_utils.hpp> @@ -33,9 +35,13 @@ using std::string; +using mesos::scheduler::Event; + using mesos::slave::ExecutorLimitation; using mesos::slave::ExecutorRunState; +using process::UPID; + namespace mesos { namespace internal { namespace protobuf { @@ -177,7 +183,7 @@ Option<bool> getTaskHealth(const Task& task) * @return A fully formed `MasterInfo` with the IP/hostname information * as derived from the `UPID`. */ -MasterInfo createMasterInfo(const process::UPID& pid) +MasterInfo createMasterInfo(const UPID& pid) { MasterInfo info; info.set_id(stringify(pid) + "-" + UUID::random().toString()); @@ -240,6 +246,153 @@ ExecutorRunState createExecutorRunState( } } // namespace slave { + +namespace scheduler { + +Event event(const FrameworkRegisteredMessage& message) +{ + Event event; + event.set_type(Event::SUBSCRIBED); + + Event::Subscribed* subscribed = event.mutable_subscribed(); + subscribed->mutable_framework_id()->CopyFrom(message.framework_id()); + + return event; +} + + +Event event(const FrameworkReregisteredMessage& message) +{ + Event event; + event.set_type(Event::SUBSCRIBED); + + Event::Subscribed* subscribed = event.mutable_subscribed(); + subscribed->mutable_framework_id()->CopyFrom(message.framework_id()); + + return event; +} + + +Event event(const ResourceOffersMessage& message) +{ + Event event; + event.set_type(Event::OFFERS); + + Event::Offers* offers = event.mutable_offers(); + offers->mutable_offers()->CopyFrom(message.offers()); + + return event; +} + + +Event event(const RescindResourceOfferMessage& message) +{ + Event event; + event.set_type(Event::RESCIND); + + Event::Rescind* rescind = event.mutable_rescind(); + rescind->mutable_offer_id()->CopyFrom(message.offer_id()); + + return event; +} + + +Event event(const StatusUpdateMessage& message) +{ + Event event; + event.set_type(Event::UPDATE); + + Event::Update* update = event.mutable_update(); + + update->mutable_status()->CopyFrom(message.update().status()); + + if (message.update().has_slave_id()) { + update->mutable_status()->mutable_slave_id()->CopyFrom( + message.update().slave_id()); + } + + if (message.update().has_executor_id()) { + update->mutable_status()->mutable_executor_id()->CopyFrom( + message.update().executor_id()); + } + + update->mutable_status()->set_timestamp(message.update().timestamp()); + + // If the update does not have a 'uuid', it does not need + // acknowledging. However, prior to 0.23.0, the update uuid + // was required and always set. In 0.24.0, we can rely on the + // update uuid check here, until then we must still check for + // this being sent from the driver (from == UPID()) or from + // the master (pid == UPID()). + // + // TODO(bmahler): For the HTTP API, we will have to update the + // master and slave to ensure the 'uuid' in TaskStatus is set + // correctly. + if (!message.update().has_uuid() || message.update().uuid() == "") { + update->mutable_status()->clear_uuid(); + } else if (UPID(message.pid()) == UPID()) { + update->mutable_status()->clear_uuid(); + } else { + update->mutable_status()->set_uuid(message.update().uuid()); + } + + return event; +} + + +Event event(const LostSlaveMessage& message) +{ + Event event; + event.set_type(Event::FAILURE); + + Event::Failure* failure = event.mutable_failure(); + failure->mutable_slave_id()->CopyFrom(message.slave_id()); + + return event; +} + + +Event event(const ExitedExecutorMessage& message) +{ + Event event; + event.set_type(Event::FAILURE); + + Event::Failure* failure = event.mutable_failure(); + failure->mutable_slave_id()->CopyFrom(message.slave_id()); + failure->mutable_executor_id()->CopyFrom(message.executor_id()); + failure->set_status(message.status()); + + return event; +} + + +Event event(const ExecutorToFrameworkMessage& message) +{ + Event event; + event.set_type(Event::MESSAGE); + + Event::Message* message_ = event.mutable_message(); + message_->mutable_slave_id()->CopyFrom(message.slave_id()); + message_->mutable_executor_id()->CopyFrom(message.executor_id()); + message_->set_data(message.data()); + + return event; +} + + +Event event(const FrameworkErrorMessage& message) +{ + Event event; + event.set_type(Event::ERROR); + + Event::Error* error = event.mutable_error(); + error->set_message(message.message()); + + return event; +} + +} // namespace scheduler { + } // namespace protobuf { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 22046ba..5c99254 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -21,6 +21,8 @@ #include <string> +#include <mesos/scheduler/scheduler.hpp> + #include <mesos/slave/isolator.hpp> #include <stout/ip.hpp> @@ -90,6 +92,23 @@ mesos::slave::ExecutorRunState createExecutorRunState( const Option<std::string>& rootfs); } // namespace slave { + +namespace scheduler { + +// Helper functions that create scheduler::Event from a message that +// is sent to the scheduler. +mesos::scheduler::Event event(const FrameworkRegisteredMessage& message); +mesos::scheduler::Event event(const FrameworkReregisteredMessage& message); +mesos::scheduler::Event event(const ResourceOffersMessage& message); +mesos::scheduler::Event event(const RescindResourceOfferMessage& message); +mesos::scheduler::Event event(const StatusUpdateMessage& message); +mesos::scheduler::Event event(const LostSlaveMessage& message); +mesos::scheduler::Event event(const ExitedExecutorMessage& message); +mesos::scheduler::Event event(const ExecutorToFrameworkMessage& message); +mesos::scheduler::Event event(const FrameworkErrorMessage& message); + +} // namespace scheduler { + } // namespace protobuf { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cb941fef/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index badc107..6887ed1 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -61,6 +61,7 @@ #include "authentication/cram_md5/authenticatee.hpp" +#include "common/protobuf_utils.hpp" #include "master/detector.hpp" @@ -549,147 +550,51 @@ protected: void receive(const UPID& from, const FrameworkRegisteredMessage& message) { - subscribed(from, message.framework_id()); - } + failover = false; - void receive(const UPID& from, const FrameworkReregisteredMessage& message) - { - subscribed(from, message.framework_id()); + receive(from, protobuf::scheduler::event(message)); } - void subscribed(const UPID& from, const FrameworkID& frameworkId) + void receive(const UPID& from, const FrameworkReregisteredMessage& message) { - // We've now registered at least once with the master so we're no - // longer failing over. See the comment where 'failover' is - // declared for further details. failover = false; - Event event; - event.set_type(Event::SUBSCRIBED); - - Event::Subscribed* subscribed = event.mutable_subscribed(); - - subscribed->mutable_framework_id()->CopyFrom(frameworkId); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const ResourceOffersMessage& message) { - Event event; - event.set_type(Event::OFFERS); - - Event::Offers* offers = event.mutable_offers(); - - offers->mutable_offers()->CopyFrom(message.offers()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const RescindResourceOfferMessage& message) { - Event event; - event.set_type(Event::RESCIND); - - Event::Rescind* rescind = event.mutable_rescind(); - - rescind->mutable_offer_id()->CopyFrom(message.offer_id()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const StatusUpdateMessage& message) { - Event event; - event.set_type(Event::UPDATE); - - Event::Update* update = event.mutable_update(); - - update->mutable_status()->CopyFrom(message.update().status()); - - if (message.update().has_slave_id()) { - update->mutable_status()->mutable_slave_id()->CopyFrom( - message.update().slave_id()); - } - - if (message.update().has_executor_id()) { - update->mutable_status()->mutable_executor_id()->CopyFrom( - message.update().executor_id()); - } - - update->mutable_status()->set_timestamp(message.update().timestamp()); - - // If the update does not have a 'uuid', it does not need - // acknowledging. However, prior to 0.23.0, the update uuid - // was required and always set. In 0.24.0, we can rely on the - // update uuid check here, until then we must still check for - // this being sent from the driver (from == UPID()) or from - // the master (pid == UPID()). - // - // TODO(bmahler): For the HTTP API, we will have to update the - // master and slave to ensure the 'uuid' in TaskStatus is set - // correctly. - if (!message.update().has_uuid() || message.update().uuid() == "") { - update->mutable_status()->clear_uuid(); - } else if (UPID(message.pid()) == UPID()) { - update->mutable_status()->clear_uuid(); - } else { - update->mutable_status()->set_uuid(message.update().uuid()); - } - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const LostSlaveMessage& message) { - Event event; - event.set_type(Event::FAILURE); - - Event::Failure* failure = event.mutable_failure(); - - failure->mutable_slave_id()->CopyFrom(message.slave_id()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const ExitedExecutorMessage& message) { - Event event; - event.set_type(Event::FAILURE); - - Event::Failure* failure = event.mutable_failure(); - - failure->mutable_slave_id()->CopyFrom(message.slave_id()); - failure->mutable_executor_id()->CopyFrom(message.executor_id()); - failure->set_status(message.status()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } - void receive(const UPID& from, const ExecutorToFrameworkMessage& _message) + void receive(const UPID& from, const ExecutorToFrameworkMessage& message) { - Event event; - event.set_type(Event::MESSAGE); - - Event::Message* message = event.mutable_message(); - - message->mutable_slave_id()->CopyFrom(_message.slave_id()); - message->mutable_executor_id()->CopyFrom(_message.executor_id()); - message->set_data(_message.data()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } void receive(const UPID& from, const FrameworkErrorMessage& message) { - Event event; - event.set_type(Event::ERROR); - - Event::Error* error = event.mutable_error(); - - error->set_message(message.message()); - - receive(from, event); + receive(from, protobuf::scheduler::event(message)); } // Helper for injecting an ERROR event.
