Repository: mesos
Updated Branches:
  refs/heads/master 3544df756 -> e833793cc


Updated Master::exited to handle http frameworks.

Review: https://reviews.apache.org/r/36720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e833793c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e833793c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e833793c

Branch: refs/heads/master
Commit: e833793ccee339e130e541934e55a67eb63bd2ad
Parents: 3544df7
Author: Anand Mazumdar <[email protected]>
Authored: Tue Aug 4 14:46:22 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Tue Aug 4 15:05:07 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 91 ++++++++++++++++++++++++++++++----------------
 src/master/master.hpp | 11 +++++-
 2 files changed, 70 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e833793c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 87e11d5..5aa0a54 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -107,6 +107,8 @@ using process::Time;
 using process::Timer;
 using process::UPID;
 
+using process::http::Pipe;
+
 using process::metrics::Counter;
 
 namespace mesos {
@@ -959,42 +961,31 @@ void Master::finalize()
 }
 
 
-void Master::exited(const UPID& pid)
+void Master::exited(const FrameworkID& frameworkId, Pipe::Writer writer)
 {
   foreachvalue (Framework* framework, frameworks.registered) {
-    if (framework->pid == pid) {
-      LOG(INFO) << "Framework " << *framework << " disconnected";
-
-      // Disconnect the framework.
-      disconnect(framework);
-
-      // Set 'failoverTimeout' to the default and update only if the
-      // input is valid.
-      Try<Duration> failoverTimeout_ =
-        Duration::create(FrameworkInfo().failover_timeout());
-      CHECK_SOME(failoverTimeout_);
-      Duration failoverTimeout = failoverTimeout_.get();
-
-      failoverTimeout_ =
-        Duration::create(framework->info.failover_timeout());
-      if (failoverTimeout_.isSome()) {
-        failoverTimeout = failoverTimeout_.get();
-      } else {
-        LOG(WARNING) << "Using the default value for 'failover_timeout' 
because"
-                     << "the input value is invalid: "
-                     << failoverTimeout_.error();
-      }
+    if (framework->http.isSome() && framework->http.get().writer == writer) {
+      CHECK_EQ(frameworkId, framework->id());
+      _exited(framework);
+      return;
+    }
 
-      LOG(INFO) << "Giving framework " << *framework << " "
-                << failoverTimeout << " to failover";
+    // If the framework has reconnected, the writer will not match
+    // above, and we will have a framework with a matching id.
+    if (frameworkId == framework->id()) {
+      LOG(INFO) << "Ignoring disconnection for framework "
+                << *framework << " as it has already reconnected";
+      return;
+    }
+  }
+}
 
-      // Delay dispatching a message to ourselves for the timeout.
-      delay(failoverTimeout,
-            self(),
-            &Master::frameworkFailoverTimeout,
-            framework->id(),
-            framework->reregisteredTime);
 
+void Master::exited(const UPID& pid)
+{
+  foreachvalue (Framework* framework, frameworks.registered) {
+    if (framework->pid == pid) {
+      _exited(framework);
       return;
     }
   }
@@ -1054,6 +1045,44 @@ void Master::exited(const UPID& pid)
 }
 
 
+void Master::_exited(Framework* framework)
+{
+  LOG(INFO) << "Framework " << *framework << " disconnected";
+
+  // Disconnect the framework.
+  disconnect(framework);
+
+  // Set 'failoverTimeout' to the default and update only if the
+  // input is valid.
+  Try<Duration> failoverTimeout_ =
+    Duration::create(FrameworkInfo().failover_timeout());
+
+  CHECK_SOME(failoverTimeout_);
+  Duration failoverTimeout = failoverTimeout_.get();
+
+  failoverTimeout_ =
+    Duration::create(framework->info.failover_timeout());
+
+  if (failoverTimeout_.isSome()) {
+    failoverTimeout = failoverTimeout_.get();
+  } else {
+    LOG(WARNING) << "Using the default value for 'failover_timeout' because"
+                 << "the input value is invalid: "
+                 << failoverTimeout_.error();
+  }
+
+  LOG(INFO) << "Giving framework " << *framework << " "
+            << failoverTimeout << " to failover";
+
+  // Delay dispatching a message to ourselves for the timeout.
+  delay(failoverTimeout,
+        self(),
+        &Master::frameworkFailoverTimeout,
+        framework->id(),
+        framework->reregisteredTime);
+}
+
+
 void Master::visit(const MessageEvent& event)
 {
   // There are three cases about the message's UPID with respect to

http://git-wip-us.apache.org/repos/asf/mesos/blob/e833793c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index cd0a5c8..e441749 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -507,10 +507,16 @@ public:
 protected:
   virtual void initialize();
   virtual void finalize();
-  virtual void exited(const process::UPID& pid);
+
   virtual void visit(const process::MessageEvent& event);
   virtual void visit(const process::ExitedEvent& event);
 
+  virtual void exited(const process::UPID& pid);
+  void exited(
+      const FrameworkID& frameworkId,
+      process::http::Pipe::Writer writer);
+  void _exited(Framework* framework);
+
   // Invoked when the message is ready to be executed after
   // being throttled.
   // 'principal' being None indicates it is throttled by
@@ -1270,6 +1276,9 @@ struct Framework
     auto encoder = recordio::Encoder<scheduler::Event>(serialize);
 
     http = Http {writer, encoder};
+
+    http.get().writer.readerClosed().
+      onAny(defer(master->self(), &Master::exited, id(), writer));
   }
 
   ~Framework()

Reply via email to