This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 4036da6  Supported updating framework info in the scheduler driver.
4036da6 is described below

commit 4036da6638427aa05f66f6780ce5d11049f95bb3
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Tue Jun 11 13:31:09 2019 -0400

    Supported updating framework info in the scheduler driver.
    
    This patch adds a method to the MesosSchedulerDriver which updates the
    FrameworkInfo (by making an UPDATE_FRAMEWORK call to the master and also
    updating the FrameworkInfo stored in the driver for the purposes of
    (re-)subscribing).
    
    Review: https://reviews.apache.org/r/70752/
---
 include/mesos/scheduler.hpp |  13 ++++++
 src/sched/sched.cpp         | 111 ++++++++++++++++++++++++++++++++++++++------
 2 files changed, 110 insertions(+), 14 deletions(-)

diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ea7cdf..c5e61d4 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -317,6 +317,17 @@ public:
   // currently known.
   virtual Status reconcileTasks(
       const std::vector<TaskStatus>& statuses) = 0;
+
+  // Updates the FrameworkInfo with the provided value (except for the
+  // framework_id field, which should not be set by the caller).
+  // The driver implementation should send the supplied FrameworkInfo update
+  // to the master. Also, all the next re-registration attempts will be
+  // performed with the provided FrameworkInfo.
+  //
+  // NOTE: If the supplied info is invalid or fails authorization,
+  // the `error()` callback will be invoked asynchronously (after
+  // the master replies with a FrameworkErrorMessage).
+  virtual Status updateFramework(const FrameworkInfo& frameworkInfo) = 0;
 };
 
 
@@ -454,6 +465,8 @@ public:
   Status reconcileTasks(
       const std::vector<TaskStatus>& statuses) override;
 
+  Status updateFramework(const FrameworkInfo& frameworkInfo) override;
+
 protected:
   // Used to detect (i.e., choose) the master.
   std::shared_ptr<master::detector::MasterDetector> detector;
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index e77a029..281236b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -218,6 +218,7 @@ public:
       latch(_latch),
       failover(_framework.has_id() && !framework.id().value().empty()),
       connected(false),
+      sendUpdateFrameworkOnConnect(false),
       running(true),
       detector(_detector),
       flags(_flags),
@@ -748,6 +749,11 @@ protected:
     connected = true;
     failover = false;
 
+    if (sendUpdateFrameworkOnConnect) {
+      sendUpdateFramework();
+    }
+    sendUpdateFrameworkOnConnect = false;
+
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
       stopwatch.start();
@@ -790,6 +796,12 @@ protected:
     connected = true;
     failover = false;
 
+    if (sendUpdateFrameworkOnConnect) {
+      sendUpdateFramework();
+    }
+    sendUpdateFrameworkOnConnect = false;
+
+
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
       stopwatch.start();
@@ -1583,6 +1595,24 @@ protected:
     send(master->pid(), call);
   }
 
+  void updateFramework(const FrameworkInfo& framework_)
+  {
+    CHECK(!framework_.has_id());
+
+    // Update the FrameworkInfo used for re-registration
+    FrameworkID frameworkId = framework.id();
+    framework = framework_;
+    *framework.mutable_id() = std::move(frameworkId);
+
+    if (connected) {
+      sendUpdateFramework();
+    } else {
+      VLOG(1) << "Postponing UPDATE_FRAMEWORK call:"
+                 " not registered with master";
+      sendUpdateFrameworkOnConnect = true;
+    }
+  }
+
 private:
   friend class mesos::MesosSchedulerDriver;
 
@@ -1626,6 +1656,23 @@ private:
     return static_cast<double>(eventCount<DispatchEvent>());
   }
 
+  void sendUpdateFramework()
+  {
+    Call call;
+
+    CHECK(framework.has_id());
+    *call.mutable_framework_id() = framework.id();
+
+    call.set_type(Call::UPDATE_FRAMEWORK);
+    *call.mutable_update_framework()->mutable_framework_info() = framework;
+
+    VLOG(1) << "Sending UPDATE_FRAMEWORK message";
+
+    CHECK_SOME(master);
+    send(master->pid(), call);
+  }
+
+
   MesosSchedulerDriver* driver;
   Scheduler* scheduler;
   FrameworkInfo framework;
@@ -1638,6 +1685,10 @@ private:
 
   bool connected; // Flag to indicate if framework is registered.
 
+  // Flag to indicate that an UPDATE_FRAMEWORK with a current FrameworkInfo
+  // should be sent after successful (re)connection attempt.
+  bool sendUpdateFrameworkOnConnect;
+
   // TODO(vinod): Instead of 'bool' use 'Status'.
   // We set 'running' to false in SchedulerDriver::stop() and
   // SchedulerDriver::abort() to prevent any further messages from
@@ -1686,6 +1737,25 @@ private:
 } // namespace mesos {
 
 
+void fillMissingFrameworkInfoFields(FrameworkInfo* framework)
+{
+  if (framework->user().empty()) {
+    Result<string> user = os::user();
+    CHECK_SOME(user);
+
+    *framework->mutable_user() = user.get();
+  }
+
+  if (framework->hostname().empty()) {
+    Try<string> hostname = net::hostname();
+
+    if (hostname.isSome()) {
+      *framework->mutable_hostname() = hostname.get();
+    }
+  }
+}
+
+
 void MesosSchedulerDriver::initialize() {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
@@ -1741,20 +1811,7 @@ void MesosSchedulerDriver::initialize() {
   // see if the current user can switch to that user, or via an
   // authentication module ensure this is acceptable.
 
-  // See FrameWorkInfo in include/mesos/mesos.proto:
-  if (framework.user().empty()) {
-    Result<string> user = os::user();
-    CHECK_SOME(user);
-
-    framework.set_user(user.get());
-  }
-
-  if (framework.hostname().empty()) {
-    Try<string> hostname = net::hostname();
-    if (hostname.isSome()) {
-      framework.set_hostname(hostname.get());
-    }
-  }
+  fillMissingFrameworkInfoFields(&framework);
 
   // Launch a local cluster if necessary.
   Option<UPID> pid;
@@ -2275,6 +2332,32 @@ Status MesosSchedulerDriver::reconcileTasks(
   }
 }
 
+Status MesosSchedulerDriver::updateFramework(const FrameworkInfo& update)
+{
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
+
+    if (update.has_id()) {
+      LOG(ERROR) << "MesosSchedulerDriver::updateFramework should not be 
called"
+                 << " with 'FrameworkInfo.id' set, aborting driver";
+      abort();
+      return status;
+    }
+
+    framework = update;
+
+    fillMissingFrameworkInfoFields(&framework);
+
+    CHECK(process != nullptr);
+
+    dispatch(process, &SchedulerProcess::updateFramework, framework);
+
+    return status;
+  }
+}
+
 
 Status MesosSchedulerDriver::requestResources(
     const vector<Request>& requests)

Reply via email to