Repository: mesos Updated Branches: refs/heads/master 5d06b3da0 -> b090b0b4b
Refactored Call message to include Subscribe message. Review: https://reviews.apache.org/r/36078 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c504c3d3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c504c3d3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c504c3d3 Branch: refs/heads/master Commit: c504c3d3201085bc9c39fddc917ea1583f32f360 Parents: 6e16807 Author: Vinod Kone <[email protected]> Authored: Tue Jun 30 11:20:17 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:57 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 4 +- include/mesos/scheduler/scheduler.proto | 52 +++++++++++++++++++------- src/examples/event_call_framework.cpp | 17 +++++++-- src/master/master.cpp | 8 ++-- src/sched/sched.cpp | 3 +- src/scheduler/scheduler.cpp | 49 +++++++++++++++---------- src/tests/scheduler_tests.cpp | 55 +++++++++++++++------------- 7 files changed, 119 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 1168c2e..3dd4a5b 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -113,7 +113,9 @@ message FrameworkInfo { optional FrameworkID id = 3; // The amount of time that the master will wait for the scheduler to - // failover before removing the framework. + // failover before it tears down the framework by killing all its + // tasks/executors. This should be non-zero if a framework expects + // to reconnect after a failover and not lose its tasks/executors. optional double failover_timeout = 4 [default = 0.0]; // If set, framework pid, executor pids and status updates are http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/include/mesos/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto index 249ec53..a027da2 100644 --- a/include/mesos/scheduler/scheduler.proto +++ b/include/mesos/scheduler/scheduler.proto @@ -144,7 +144,7 @@ message Call { // Possible call types, followed by message definitions if // applicable. enum Type { - SUBSCRIBE = 1; // See 'framework_info' below. + SUBSCRIBE = 1; // See 'Subscribe' below. TEARDOWN = 2; // Shuts down all tasks/executors and removes framework. ACCEPT = 3; // See 'Accept' below. DECLINE = 4; // See 'Decline' below. @@ -164,6 +164,28 @@ message Call { // something that is not an issue with the Event/Call API. } + // Subscribes the scheduler with the master to receive events. A + // scheduler must send other calls only after it has received the + // SUBCRIBED event. + message Subscribe { + // See the comments below on 'framework_id' on the semantics for + // 'framework_info.id'. + required FrameworkInfo framework_info = 1; + + // 'force' tells the master what to do in case an instance of the + // scheduler attempts to subscribe when another instance of it + // is already connected (e.g., split brain due to network + // partition). If 'force' is true, the newly subscribing scheduler + // instance is allowed and the old connected scheduler instance + // is disconnected. If false, the newly subscribing scheduler + // instance is disallowed subscription in favor of the already + // connected scheduler instance. It is recommended to set this to + // true only when a scheduler instance is (just) elected but not + // when it is retrying subscription (disconnection or master + // failover; see sched/sched.cpp for an example). + required bool force = 2; + } + // Accepts an offer, performing the specified operations // in a sequential manner. // @@ -260,22 +282,24 @@ message Call { required bytes data = 3; } - // Identifies who generated this call. Always necessary, but the - // only thing that needs to be set for certain calls, e.g., - // SUBSCRIBE and TEARDOWN. 'framework_info.id()' must be always - // set except when a brand new scheduler SUBSCRIBEs for the very - // first time. - required FrameworkInfo framework_info = 1; + // Identifies who generated this call. Master assigns a framework id + // when a new scheduler subscribes for the first time. Once assigned, + // the scheduler must set the 'framework_id' here and within its + // FrameworkInfo (in any further 'Subscribe' calls). This allows the + // master to identify a scheduler correctly across disconnections, + // failovers, etc. + optional FrameworkID framework_id = 1; // Type of the call, indicates which optional field below should be // present if that type has a nested message definition. required Type type = 2; - optional Accept accept = 3; - optional Decline decline = 4; - optional Kill kill = 5; - optional Shutdown shutdown = 6; - optional Acknowledge acknowledge = 7; - optional Reconcile reconcile = 8; - optional Message message = 9; + optional Subscribe subscribe = 3; + optional Accept accept = 4; + optional Decline decline = 5; + optional Kill kill = 6; + optional Shutdown shutdown = 7; + optional Acknowledge acknowledge = 8; + optional Reconcile reconcile = 9; + optional Message message = 10; } http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/examples/event_call_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/event_call_framework.cpp b/src/examples/event_call_framework.cpp index b9de22f..17fdcac 100644 --- a/src/examples/event_call_framework.cpp +++ b/src/examples/event_call_framework.cpp @@ -233,7 +233,8 @@ private: } Call call; - call.mutable_framework_info()->CopyFrom(framework); + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -260,7 +261,8 @@ private: if (status.has_uuid()) { Call call; - call.mutable_framework_info()->CopyFrom(framework); + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); call.set_type(Call::ACKNOWLEDGE); Call::Acknowledge* ack = call.mutable_acknowledge(); @@ -297,9 +299,15 @@ private: } Call call; - call.mutable_framework_info()->CopyFrom(framework); + if (framework.has_id()) { + call.mutable_framework_id()->CopyFrom(framework.id()); + } call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(framework); + subscribe->set_force(true); + mesos.send(call); process::delay(Seconds(1), @@ -310,7 +318,8 @@ private: void finalize() { Call call; - call.mutable_framework_info()->CopyFrom(framework); + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); call.set_type(Call::TEARDOWN); mesos.send(call); http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a7486d8..9f9f578 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1584,9 +1584,8 @@ void Master::drop( // TODO(bmahler): Increment a metric. LOG(ERROR) << "Dropping " << call.type() << " call" - << " from framework " << call.framework_info().id() - << " (" << call.framework_info().name() - << ") at " << from << ": " << message; + << " from framework " << call.framework_id() + << " at " << from << ": " << message; } @@ -1613,7 +1612,6 @@ void Master::receive( { // TODO(vinod): Add metrics for calls. // TODO(vinod): Implement the unimplemented calls. - const FrameworkInfo& frameworkInfo = call.framework_info(); // For SUBSCRIBE call, no need to look up the framework. Therefore, // we handle them first and separately from other types of calls. @@ -1628,7 +1626,7 @@ void Master::receive( // We consolidate the framework lookup and pid validation logic here // because they are common for all the call handlers. - Framework* framework = getFramework(frameworkInfo.id()); + Framework* framework = getFramework(call.framework_id()); if (framework == NULL) { drop(from, call, "Framework cannot be found"); http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 7563abb..a748686 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -1017,7 +1017,8 @@ protected: } Call message; - message.mutable_framework_info()->CopyFrom(framework); + CHECK(framework.has_id()); + message.mutable_framework_id()->CopyFrom(framework.id()); message.set_type(Call::ACCEPT); Call::Accept* accept = message.mutable_accept(); http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index f360e4d..38fd286 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -191,21 +191,20 @@ public: return; } + // Only a SUBSCRIBE call may not have set the framework ID. + if (call.type() != Call::SUBSCRIBE && !call.has_framework_id()) { + drop(call, "Expecting 'framework_id' to be present"); + return; + } + // If no user was specified in FrameworkInfo, use the current user. - // TODO(benh): Make FrameworkInfo.user be optional and add a - // 'user' to either TaskInfo or CommandInfo. - if (call.framework_info().user() == "") { + // TODO(benh): Make FrameworkInfo.user be optional. + if (call.type() == Call::SUBSCRIBE && + call.subscribe().framework_info().user() == "") { Result<string> user = os::user(); CHECK_SOME(user); - call.mutable_framework_info()->set_user(user.get()); - } - - // Only a SUBSCRIBE call may not have set the framework ID. - if (call.type() != Call::SUBSCRIBE && - (!call.framework_info().has_id() || call.framework_info().id() == "")) { - drop(call, "Call is missing FrameworkInfo.id"); - return; + call.mutable_subscribe()->mutable_framework_info()->set_user(user.get()); } if (!call.IsInitialized()) { @@ -216,14 +215,26 @@ public: switch (call.type()) { case Call::SUBSCRIBE: { - if (!call.framework_info().has_id() || - call.framework_info().id() == "") { + if (!call.has_subscribe()) { + drop(call, "Expecting 'subscribe' to be present"); + return; + } + + const FrameworkInfo frameworkInfo = call.subscribe().framework_info(); + + if (!(frameworkInfo.id() == call.framework_id())) { + drop(call, "Framework id in the call doesn't match the framework id" + " in the 'subscribe' message"); + return; + } + + if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { RegisterFrameworkMessage message; - message.mutable_framework()->CopyFrom(call.framework_info()); + message.mutable_framework()->CopyFrom(frameworkInfo); send(master.get(), message); } else { ReregisterFrameworkMessage message; - message.mutable_framework()->CopyFrom(call.framework_info()); + message.mutable_framework()->CopyFrom(frameworkInfo); message.set_failover(failover); send(master.get(), message); } @@ -241,7 +252,7 @@ public: return; } LaunchTasksMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_info().id()); + message.mutable_framework_id()->CopyFrom(call.framework_id()); message.mutable_filters()->CopyFrom(call.decline().filters()); message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids()); send(master.get(), message); @@ -259,7 +270,7 @@ public: case Call::REVIVE: { ReviveOffersMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_info().id()); + message.mutable_framework_id()->CopyFrom(call.framework_id()); send(master.get(), message); break; } @@ -288,7 +299,7 @@ public: return; } StatusUpdateAcknowledgementMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_info().id()); + message.mutable_framework_id()->CopyFrom(call.framework_id()); message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id()); message.mutable_task_id()->CopyFrom(call.acknowledge().task_id()); message.set_uuid(call.acknowledge().uuid()); @@ -313,7 +324,7 @@ public: } FrameworkToExecutorMessage message; message.mutable_slave_id()->CopyFrom(call.message().slave_id()); - message.mutable_framework_id()->CopyFrom(call.framework_info().id()); + message.mutable_framework_id()->CopyFrom(call.framework_id()); message.mutable_executor_id()->CopyFrom(call.message().executor_id()); message.set_data(call.message().data()); send(master.get(), message); http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index cbe6c91..5a0c645 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -141,9 +141,12 @@ TEST_F(SchedulerTest, TaskRunning) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + mesos.send(call); } @@ -184,8 +187,7 @@ TEST_F(SchedulerTest, TaskRunning) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -246,9 +248,12 @@ TEST_F(SchedulerTest, ReconcileTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + mesos.send(call); } @@ -274,8 +279,7 @@ TEST_F(SchedulerTest, ReconcileTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -295,8 +299,7 @@ TEST_F(SchedulerTest, ReconcileTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::RECONCILE); Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks(); @@ -354,9 +357,12 @@ TEST_F(SchedulerTest, KillTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + mesos.send(call); } @@ -382,8 +388,7 @@ TEST_F(SchedulerTest, KillTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -404,8 +409,7 @@ TEST_F(SchedulerTest, KillTask) { // Acknowledge TASK_RUNNING update. Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACKNOWLEDGE); Call::Acknowledge* acknowledge = call.mutable_acknowledge(); @@ -421,8 +425,7 @@ TEST_F(SchedulerTest, KillTask) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::KILL); Call::Kill* kill = call.mutable_kill(); @@ -478,9 +481,12 @@ TEST_F(SchedulerTest, ShutdownExecutor) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + mesos.send(call); } @@ -506,8 +512,7 @@ TEST_F(SchedulerTest, ShutdownExecutor) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -531,8 +536,7 @@ TEST_F(SchedulerTest, ShutdownExecutor) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::SHUTDOWN); Call::Shutdown* shutdown = call.mutable_shutdown(); @@ -590,9 +594,12 @@ TEST_F(SchedulerTest, Teardown) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + mesos.send(call); } @@ -618,8 +625,7 @@ TEST_F(SchedulerTest, Teardown) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::ACCEPT); Call::Accept* accept = call.mutable_accept(); @@ -643,8 +649,7 @@ TEST_F(SchedulerTest, Teardown) { Call call; - call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); - call.mutable_framework_info()->mutable_id()->CopyFrom(id); + call.mutable_framework_id()->CopyFrom(id); call.set_type(Call::TEARDOWN); mesos.send(call);
