Repository: mesos
Updated Branches:
  refs/heads/master adecbfa6a -> c687237e8


Added Heartbeater to master to send periodic heartbeats to HTTP schedulers.

Review: https://reviews.apache.org/r/37277


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c687237e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c687237e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c687237e

Branch: refs/heads/master
Commit: c687237e8e4e7e40e6c7a9d8ba6f2e7d90dab155
Parents: adecbfa
Author: Vinod Kone <[email protected]>
Authored: Sat Aug 8 19:34:59 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Thu Aug 13 11:08:21 2015 -0700

----------------------------------------------------------------------
 src/internal/evolve.cpp      |   6 +++
 src/internal/evolve.hpp      |   3 ++
 src/master/constants.cpp     |   1 +
 src/master/constants.hpp     |   5 ++
 src/master/master.cpp        |  22 ++++++--
 src/master/master.hpp        | 106 +++++++++++++++++++++++++++++++++-----
 src/tests/http_api_tests.cpp |  46 ++++++++++++++++-
 7 files changed, 170 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 4678d67..11ce9e7 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -116,6 +116,12 @@ v1::scheduler::Call evolve(const scheduler::Call& call)
 }
 
 
+v1::scheduler::Event evolve(const scheduler::Event& event)
+{
+  return evolve<v1::scheduler::Event>(event);
+}
+
+
 v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message)
 {
   v1::scheduler::Event event;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 2e03559..13e9f52 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -64,6 +64,9 @@ google::protobuf::RepeatedPtrField<T1> evolve(
 }
 
 
+v1::scheduler::Event evolve(const scheduler::Event& event);
+
+
 // Helper functions that evolve old style internal messages to a
 // v1::scheduler::Event.
 v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/constants.cpp
----------------------------------------------------------------------
diff --git a/src/master/constants.cpp b/src/master/constants.cpp
index fbcae60..918dd70 100644
--- a/src/master/constants.cpp
+++ b/src/master/constants.cpp
@@ -31,6 +31,7 @@ namespace master {
 const int MAX_OFFERS_PER_FRAMEWORK = 50;
 const double MIN_CPUS = 0.01;
 const Bytes MIN_MEM = Megabytes(32);
+const Duration DEFAULT_HEARTBEAT_INTERVAL = Seconds(15);
 const Duration DEFAULT_SLAVE_PING_TIMEOUT = Seconds(15);
 const size_t DEFAULT_MAX_SLAVE_PING_TIMEOUTS = 5;
 const Duration MIN_SLAVE_REREGISTER_TIMEOUT = Minutes(10);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 7cec18b..7afa1ec 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -54,6 +54,11 @@ extern const double MIN_CPUS;
 // Minimum amount of memory per offer.
 extern const Bytes MIN_MEM;
 
+
+// Default interval the master uses to send heartbeats to an HTTP
+// scheduler.
+extern const Duration DEFAULT_HEARTBEAT_INTERVAL;
+
 // Amount of time within which a slave PING should be received.
 // NOTE: The slave uses these PING constants to determine when
 // the master has stopped sending pings. If these are made

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 398203d..83d2f44 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1778,6 +1778,10 @@ void Master::subscribe(
   // TODO(anand): Authenticate the framework.
 
   const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+
+  LOG(INFO) << "Received subscription request for"
+            << " HTTP framework '" << frameworkInfo.name() << "'";
+
   Option<Error> validationError = None();
 
   // TODO(vinod): Deprecate this in favor of ACLs.
@@ -1808,9 +1812,6 @@ void Master::subscribe(
     }
   }
 
-  LOG(INFO) << "Received subscription request for"
-            << " HTTP framework '" << frameworkInfo.name() << "'";
-
   if (validationError.isSome()) {
     LOG(INFO) << "Refusing subscription of framework"
               << " '" << frameworkInfo.name() << "': "
@@ -1889,6 +1890,10 @@ void Master::_subscribe(
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_master_info()->MergeFrom(info_);
     framework->send(message);
+
+    // Start the heartbeat after sending SUBSCRIBED event.
+    framework->heartbeat();
+
     return;
   }
 
@@ -1934,7 +1939,7 @@ void Master::_subscribe(
       return;
     } else {
       LOG(INFO) << "Allowing framework " << *framework
-                << " to subcribe with an already used id";
+                << " to subscribe with an already used id";
 
       // Convert the framework to an http framework if it was
       // pid based in the past.
@@ -1958,6 +1963,9 @@ void Master::_subscribe(
       message.mutable_framework_id()->MergeFrom(framework->id());
       message.mutable_master_info()->MergeFrom(info_);
       framework->send(message);
+
+      // Start the heartbeat after sending SUBSCRIBED event.
+      framework->heartbeat();
     }
   } else {
     // We don't have a framework with this ID, so we must be a newly
@@ -1986,6 +1994,9 @@ void Master::_subscribe(
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_master_info()->MergeFrom(info_);
     framework->send(message);
+
+    // Start the heartbeat after sending SUBSCRIBED event.
+    framework->heartbeat();
   }
 
   CHECK(frameworks.registered.contains(frameworkInfo.id()))
@@ -5005,6 +5016,9 @@ void Master::failoverFramework(Framework* framework, 
const HttpConnection& http)
     .onAny(defer(self(), &Self::exited, framework->id(), http));
 
   _failoverFramework(framework);
+
+  // Start the heartbeat after sending SUBSCRIBED event.
+  framework->heartbeat();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4e29470..b353b8e 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -586,7 +586,7 @@ protected:
   // the event of a scheduler failover.
   void failoverFramework(Framework* framework, const process::UPID& newPid);
 
-  // Replace the scheduler for a framework with a new http connection,
+  // Replace the scheduler for a framework with a new HTTP connection,
   // in the event of a scheduler failover.
   void failoverFramework(Framework* framework, const HttpConnection& http);
 
@@ -1278,6 +1278,47 @@ struct HttpConnection
 };
 
 
+// This process periodically sends heartbeats to a scheduler on the
+// given HTTP connection.
+class Heartbeater : public process::Process<Heartbeater>
+{
+public:
+  Heartbeater(const FrameworkID& _frameworkId,
+              const HttpConnection& _http,
+              const Duration& _interval)
+    : process::ProcessBase(process::ID::generate("heartbeater")),
+      frameworkId(_frameworkId),
+      http(_http),
+      interval(_interval) {}
+
+protected:
+  virtual void initialize() override
+  {
+    heartbeat();
+  }
+
+private:
+  void heartbeat()
+  {
+    // Only send a heartbeat if the connection is not closed.
+    if (http.closed().isPending()) {
+      VLOG(1) << "Sending heartbeat to " << frameworkId;
+
+      scheduler::Event event;
+      event.set_type(scheduler::Event::HEARTBEAT);
+
+      http.send(event);
+    }
+
+    process::delay(interval, self(), &Self::heartbeat);
+  }
+
+  const FrameworkID frameworkId;
+  HttpConnection http;
+  const Duration interval;
+};
+
+
 // Information about a connected or completed framework.
 // TODO(bmahler): Keeping the task and executor information in sync
 // across the Slave and Framework structs is error prone!
@@ -1311,10 +1352,8 @@ struct Framework
 
   ~Framework()
   {
-    if (http.isSome() && connected) {
-      if (!http.get().close()) {
-        LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
-      }
+    if (http.isSome()) {
+      cleanupConnection();
     }
   }
 
@@ -1531,11 +1570,10 @@ struct Framework
 
   void updateConnection(const process::UPID& newPid)
   {
-    // Remove the http connnection if this is a downgrade from
-    // http to pid, note the connection may already be closed.
+    // Cleanup the HTTP connnection if this is a downgrade from HTTP
+    // to PID. Note that the connection may already be closed.
     if (http.isSome()) {
-      http.get().close();
-      http = None();
+      cleanupConnection();
     }
 
     // TODO(benh): unlink(oldPid);
@@ -1544,20 +1582,57 @@ struct Framework
 
   void updateConnection(const HttpConnection& newHttp)
   {
-    // Wipe the pid if this is an upgrade from pid to http.
     if (pid.isSome()) {
+      // Wipe the PID if this is an upgrade from PID to HTTP.
       // TODO(benh): unlink(oldPid);
       pid = None();
+    } else {
+      // Cleanup the old HTTP connection.
+      // Note that master creates a new HTTP connection for every
+      // subscribe request, so 'newHttp' should always be different
+      // from 'http'.
+      cleanupConnection();
     }
 
-    // Close the existing connection if it has changed.
-    if (http.isSome() && http.get().writer != newHttp.writer) {
-      http.get().close();
-    }
+    CHECK_NONE(http);
 
     http = newHttp;
   }
 
+  // Closes the connection and stops the heartbeat.
+  // TODO(vinod): Currently 'connected' variable is set separately
+  // from this method. We need to make sure these are in sync.
+  void cleanupConnection()
+  {
+    CHECK_SOME(http);
+
+    if (connected && !http.get().close()) {
+      LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
+    }
+
+    http = None();
+
+    CHECK_SOME(heartbeater);
+
+    terminate(heartbeater.get().get());
+    wait(heartbeater.get().get());
+
+    heartbeater = None();
+  }
+
+  void heartbeat()
+  {
+    CHECK_NONE(heartbeater);
+    CHECK_SOME(http);
+
+    // TODO(vinod): Make heartbeat interval configurable and include
+    // this information in the SUBSCRIBED response.
+    heartbeater =
+      new Heartbeater(info.id(), http.get(), DEFAULT_HEARTBEAT_INTERVAL);
+
+    process::spawn(heartbeater.get().get());
+  }
+
   Master* const master;
 
   FrameworkInfo info;
@@ -1626,6 +1701,9 @@ struct Framework
   Resources totalOfferedResources;
   hashmap<SlaveID, Resources> offeredResources;
 
+  // This is only set for HTTP frameworks.
+  Option<process::Owned<Heartbeater>> heartbeater;
+
 private:
   Framework(const Framework&);              // No copying.
   Framework& operator=(const Framework&); // No assigning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c687237e/src/tests/http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_api_tests.cpp b/src/tests/http_api_tests.cpp
index aef3c4b..1d11d29 100644
--- a/src/tests/http_api_tests.cpp
+++ b/src/tests/http_api_tests.cpp
@@ -21,6 +21,7 @@
 #include <mesos/v1/mesos.hpp>
 #include <mesos/v1/scheduler.hpp>
 
+#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
@@ -33,11 +34,13 @@
 #include "common/http.hpp"
 #include "common/recordio.hpp"
 
+#include "master/constants.hpp"
 #include "master/master.hpp"
 
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
+using mesos::internal::master::DEFAULT_HEARTBEAT_INTERVAL;
 
 using mesos::internal::master::Master;
 
@@ -46,6 +49,7 @@ using mesos::internal::recordio::Reader;
 using mesos::v1::scheduler::Call;
 using mesos::v1::scheduler::Event;
 
+using process::Clock;
 using process::Future;
 using process::PID;
 
@@ -254,7 +258,8 @@ TEST_P(HttpApiTest, UnsupportedContentMediaType)
 
 
 // This test verifies if the scheduler is able to receive a Subscribed
-// event on the stream in response to a Subscribe call request.
+// event and heartbeat events on the stream in response to a Subscribe
+// call request.
 TEST_P(HttpApiTest, Subscribe)
 {
   // HTTP schedulers cannot yet authenticate.
@@ -302,6 +307,23 @@ TEST_P(HttpApiTest, Subscribe)
   ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
   EXPECT_NE("", event.get().get().subscribed().framework_id().value());
 
+  // Make sure it receives a heartbeat.
+  event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  ASSERT_EQ(Event::HEARTBEAT, event.get().get().type());
+
+  // Advance the clock to receive another heartbeat.
+  Clock::pause();
+  Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+
+  event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  ASSERT_EQ(Event::HEARTBEAT, event.get().get().type());
+
   Shutdown();
 }
 
@@ -387,6 +409,13 @@ TEST_P(HttpApiTest, SubscribedOnRetryWithForce)
     // Check event type is subscribed and the same framework id is set.
     ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
     EXPECT_EQ(frameworkId, event.get().get().subscribed().framework_id());
+
+    // Make sure it receives a heartbeat.
+    event = responseDecoder.read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    ASSERT_EQ(Event::HEARTBEAT, event.get().get().type());
   }
 
   Shutdown();
@@ -474,6 +503,13 @@ TEST_P(HttpApiTest, UpdatePidToHttpScheduler)
   EXPECT_EQ(evolve(frameworkId.get()),
             event.get().get().subscribed().framework_id());
 
+  // Make sure it receives a heartbeat.
+  event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  ASSERT_EQ(Event::HEARTBEAT, event.get().get().type());
+
   driver.stop();
   driver.join();
 
@@ -553,6 +589,14 @@ TEST_P(HttpApiTest, UpdatePidToHttpSchedulerWithoutForce)
   // was already connected.
   ASSERT_EQ(Event::ERROR, event.get().get().type());
 
+  // Unsubscribed HTTP framework should not get any heartbeats.
+  Clock::pause();
+  Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+  Clock::settle();
+
+  event = responseDecoder.read();
+  ASSERT_TRUE(event.isPending());
+
   driver.stop();
   driver.join();
 

Reply via email to