Repository: mesos
Updated Branches:
  refs/heads/master 5d06b3da0 -> b090b0b4b


Refactored Call message to include Subscribe message.

Review: https://reviews.apache.org/r/36078


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c504c3d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c504c3d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c504c3d3

Branch: refs/heads/master
Commit: c504c3d3201085bc9c39fddc917ea1583f32f360
Parents: 6e16807
Author: Vinod Kone <[email protected]>
Authored: Tue Jun 30 11:20:17 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Wed Jul 1 17:54:57 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto               |  4 +-
 include/mesos/scheduler/scheduler.proto | 52 +++++++++++++++++++-------
 src/examples/event_call_framework.cpp   | 17 +++++++--
 src/master/master.cpp                   |  8 ++--
 src/sched/sched.cpp                     |  3 +-
 src/scheduler/scheduler.cpp             | 49 +++++++++++++++----------
 src/tests/scheduler_tests.cpp           | 55 +++++++++++++++-------------
 7 files changed, 119 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 1168c2e..3dd4a5b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -113,7 +113,9 @@ message FrameworkInfo {
   optional FrameworkID id = 3;
 
   // The amount of time that the master will wait for the scheduler to
-  // failover before removing the framework.
+  // failover before it tears down the framework by killing all its
+  // tasks/executors. This should be non-zero if a framework expects
+  // to reconnect after a failover and not lose its tasks/executors.
   optional double failover_timeout = 4 [default = 0.0];
 
   // If set, framework pid, executor pids and status updates are

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto 
b/include/mesos/scheduler/scheduler.proto
index 249ec53..a027da2 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -144,7 +144,7 @@ message Call {
   // Possible call types, followed by message definitions if
   // applicable.
   enum Type {
-    SUBSCRIBE = 1;   // See 'framework_info' below.
+    SUBSCRIBE = 1;   // See 'Subscribe' below.
     TEARDOWN = 2;    // Shuts down all tasks/executors and removes framework.
     ACCEPT = 3;      // See 'Accept' below.
     DECLINE = 4;     // See 'Decline' below.
@@ -164,6 +164,28 @@ message Call {
     // something that is not an issue with the Event/Call API.
   }
 
+  // Subscribes the scheduler with the master to receive events. A
+  // scheduler must send other calls only after it has received the
+  // SUBCRIBED event.
+  message Subscribe {
+    // See the comments below on 'framework_id' on the semantics for
+    // 'framework_info.id'.
+    required FrameworkInfo framework_info = 1;
+
+    // 'force' tells the master what to do in case an instance of the
+    // scheduler attempts to subscribe when another instance of it
+    // is already connected (e.g., split brain due to network
+    // partition). If 'force' is true, the newly subscribing scheduler
+    // instance is allowed and the old connected scheduler instance
+    // is disconnected. If false, the newly subscribing scheduler
+    // instance is disallowed subscription in favor of the already
+    // connected scheduler instance. It is recommended to set this to
+    // true only when a scheduler instance is (just) elected but not
+    // when it is retrying subscription (disconnection or master
+    // failover; see sched/sched.cpp for an example).
+    required bool force = 2;
+  }
+
   // Accepts an offer, performing the specified operations
   // in a sequential manner.
   //
@@ -260,22 +282,24 @@ message Call {
     required bytes data = 3;
   }
 
-  // Identifies who generated this call. Always necessary, but the
-  // only thing that needs to be set for certain calls, e.g.,
-  // SUBSCRIBE and TEARDOWN. 'framework_info.id()' must be always
-  // set except when a brand new scheduler SUBSCRIBEs for the very
-  // first time.
-  required FrameworkInfo framework_info = 1;
+  // Identifies who generated this call. Master assigns a framework id
+  // when a new scheduler subscribes for the first time. Once assigned,
+  // the scheduler must set the 'framework_id' here and within its
+  // FrameworkInfo (in any further 'Subscribe' calls). This allows the
+  // master to identify a scheduler correctly across disconnections,
+  // failovers, etc.
+  optional FrameworkID framework_id = 1;
 
   // Type of the call, indicates which optional field below should be
   // present if that type has a nested message definition.
   required Type type = 2;
 
-  optional Accept accept = 3;
-  optional Decline decline = 4;
-  optional Kill kill = 5;
-  optional Shutdown shutdown = 6;
-  optional Acknowledge acknowledge = 7;
-  optional Reconcile reconcile = 8;
-  optional Message message = 9;
+  optional Subscribe subscribe = 3;
+  optional Accept accept = 4;
+  optional Decline decline = 5;
+  optional Kill kill = 6;
+  optional Shutdown shutdown = 7;
+  optional Acknowledge acknowledge = 8;
+  optional Reconcile reconcile = 9;
+  optional Message message = 10;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/examples/event_call_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/event_call_framework.cpp 
b/src/examples/event_call_framework.cpp
index b9de22f..17fdcac 100644
--- a/src/examples/event_call_framework.cpp
+++ b/src/examples/event_call_framework.cpp
@@ -233,7 +233,8 @@ private:
       }
 
       Call call;
-      call.mutable_framework_info()->CopyFrom(framework);
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
       call.set_type(Call::ACCEPT);
 
       Call::Accept* accept = call.mutable_accept();
@@ -260,7 +261,8 @@ private:
 
     if (status.has_uuid()) {
       Call call;
-      call.mutable_framework_info()->CopyFrom(framework);
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
       call.set_type(Call::ACKNOWLEDGE);
 
       Call::Acknowledge* ack = call.mutable_acknowledge();
@@ -297,9 +299,15 @@ private:
     }
 
     Call call;
-    call.mutable_framework_info()->CopyFrom(framework);
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+    subscribe->set_force(true);
+
     mesos.send(call);
 
     process::delay(Seconds(1),
@@ -310,7 +318,8 @@ private:
   void finalize()
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(framework);
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
     call.set_type(Call::TEARDOWN);
 
     mesos.send(call);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a7486d8..9f9f578 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1584,9 +1584,8 @@ void Master::drop(
   // TODO(bmahler): Increment a metric.
 
   LOG(ERROR) << "Dropping " << call.type() << " call"
-             << " from framework " << call.framework_info().id()
-             << " (" << call.framework_info().name()
-             << ") at " << from << ": " << message;
+             << " from framework " << call.framework_id()
+             << " at " << from << ": " << message;
 }
 
 
@@ -1613,7 +1612,6 @@ void Master::receive(
 {
   // TODO(vinod): Add metrics for calls.
   // TODO(vinod): Implement the unimplemented calls.
-  const FrameworkInfo& frameworkInfo = call.framework_info();
 
   // For SUBSCRIBE call, no need to look up the framework. Therefore,
   // we handle them first and separately from other types of calls.
@@ -1628,7 +1626,7 @@ void Master::receive(
 
   // We consolidate the framework lookup and pid validation logic here
   // because they are common for all the call handlers.
-  Framework* framework = getFramework(frameworkInfo.id());
+  Framework* framework = getFramework(call.framework_id());
 
   if (framework == NULL) {
     drop(from, call, "Framework cannot be found");

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 7563abb..a748686 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1017,7 +1017,8 @@ protected:
     }
 
     Call message;
-    message.mutable_framework_info()->CopyFrom(framework);
+    CHECK(framework.has_id());
+    message.mutable_framework_id()->CopyFrom(framework.id());
     message.set_type(Call::ACCEPT);
 
     Call::Accept* accept = message.mutable_accept();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index f360e4d..38fd286 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -191,21 +191,20 @@ public:
       return;
     }
 
+    // Only a SUBSCRIBE call may not have set the framework ID.
+    if (call.type() != Call::SUBSCRIBE && !call.has_framework_id()) {
+      drop(call, "Expecting 'framework_id' to be present");
+      return;
+    }
+
     // If no user was specified in FrameworkInfo, use the current user.
-    // TODO(benh): Make FrameworkInfo.user be optional and add a
-    // 'user' to either TaskInfo or CommandInfo.
-    if (call.framework_info().user() == "") {
+    // TODO(benh): Make FrameworkInfo.user be optional.
+    if (call.type() == Call::SUBSCRIBE &&
+        call.subscribe().framework_info().user() == "") {
       Result<string> user = os::user();
       CHECK_SOME(user);
 
-      call.mutable_framework_info()->set_user(user.get());
-    }
-
-    // Only a SUBSCRIBE call may not have set the framework ID.
-    if (call.type() != Call::SUBSCRIBE &&
-        (!call.framework_info().has_id() || call.framework_info().id() == "")) 
{
-      drop(call, "Call is missing FrameworkInfo.id");
-      return;
+      call.mutable_subscribe()->mutable_framework_info()->set_user(user.get());
     }
 
     if (!call.IsInitialized()) {
@@ -216,14 +215,26 @@ public:
 
     switch (call.type()) {
       case Call::SUBSCRIBE: {
-        if (!call.framework_info().has_id() ||
-            call.framework_info().id() == "") {
+        if (!call.has_subscribe()) {
+          drop(call, "Expecting 'subscribe' to be present");
+          return;
+        }
+
+        const FrameworkInfo frameworkInfo = call.subscribe().framework_info();
+
+        if (!(frameworkInfo.id() == call.framework_id())) {
+          drop(call, "Framework id in the call doesn't match the framework id"
+                     " in the 'subscribe' message");
+          return;
+        }
+
+        if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
           RegisterFrameworkMessage message;
-          message.mutable_framework()->CopyFrom(call.framework_info());
+          message.mutable_framework()->CopyFrom(frameworkInfo);
           send(master.get(), message);
         } else {
           ReregisterFrameworkMessage message;
-          message.mutable_framework()->CopyFrom(call.framework_info());
+          message.mutable_framework()->CopyFrom(frameworkInfo);
           message.set_failover(failover);
           send(master.get(), message);
         }
@@ -241,7 +252,7 @@ public:
           return;
         }
         LaunchTasksMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_framework_id()->CopyFrom(call.framework_id());
         message.mutable_filters()->CopyFrom(call.decline().filters());
         message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids());
         send(master.get(), message);
@@ -259,7 +270,7 @@ public:
 
       case Call::REVIVE: {
         ReviveOffersMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_framework_id()->CopyFrom(call.framework_id());
         send(master.get(), message);
         break;
       }
@@ -288,7 +299,7 @@ public:
           return;
         }
         StatusUpdateAcknowledgementMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_framework_id()->CopyFrom(call.framework_id());
         message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
         message.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
         message.set_uuid(call.acknowledge().uuid());
@@ -313,7 +324,7 @@ public:
         }
         FrameworkToExecutorMessage message;
         message.mutable_slave_id()->CopyFrom(call.message().slave_id());
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_framework_id()->CopyFrom(call.framework_id());
         message.mutable_executor_id()->CopyFrom(call.message().executor_id());
         message.set_data(call.message().data());
         send(master.get(), message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c504c3d3/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index cbe6c91..5a0c645 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -141,9 +141,12 @@ TEST_F(SchedulerTest, TaskRunning)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
     mesos.send(call);
   }
 
@@ -184,8 +187,7 @@ TEST_F(SchedulerTest, TaskRunning)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACCEPT);
 
     Call::Accept* accept = call.mutable_accept();
@@ -246,9 +248,12 @@ TEST_F(SchedulerTest, ReconcileTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
     mesos.send(call);
   }
 
@@ -274,8 +279,7 @@ TEST_F(SchedulerTest, ReconcileTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACCEPT);
 
     Call::Accept* accept = call.mutable_accept();
@@ -295,8 +299,7 @@ TEST_F(SchedulerTest, ReconcileTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::RECONCILE);
 
     Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
@@ -354,9 +357,12 @@ TEST_F(SchedulerTest, KillTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
     mesos.send(call);
   }
 
@@ -382,8 +388,7 @@ TEST_F(SchedulerTest, KillTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACCEPT);
 
     Call::Accept* accept = call.mutable_accept();
@@ -404,8 +409,7 @@ TEST_F(SchedulerTest, KillTask)
   {
     // Acknowledge TASK_RUNNING update.
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACKNOWLEDGE);
 
     Call::Acknowledge* acknowledge = call.mutable_acknowledge();
@@ -421,8 +425,7 @@ TEST_F(SchedulerTest, KillTask)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::KILL);
 
     Call::Kill* kill = call.mutable_kill();
@@ -478,9 +481,12 @@ TEST_F(SchedulerTest, ShutdownExecutor)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
     mesos.send(call);
   }
 
@@ -506,8 +512,7 @@ TEST_F(SchedulerTest, ShutdownExecutor)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACCEPT);
 
     Call::Accept* accept = call.mutable_accept();
@@ -531,8 +536,7 @@ TEST_F(SchedulerTest, ShutdownExecutor)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::SHUTDOWN);
 
     Call::Shutdown* shutdown = call.mutable_shutdown();
@@ -590,9 +594,12 @@ TEST_F(SchedulerTest, Teardown)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.set_type(Call::SUBSCRIBE);
 
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
     mesos.send(call);
   }
 
@@ -618,8 +625,7 @@ TEST_F(SchedulerTest, Teardown)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::ACCEPT);
 
     Call::Accept* accept = call.mutable_accept();
@@ -643,8 +649,7 @@ TEST_F(SchedulerTest, Teardown)
 
   {
     Call call;
-    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.mutable_framework_id()->CopyFrom(id);
     call.set_type(Call::TEARDOWN);
 
     mesos.send(call);

Reply via email to