This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 4036da6 Supported updating framework info in the scheduler driver.
4036da6 is described below
commit 4036da6638427aa05f66f6780ce5d11049f95bb3
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Tue Jun 11 13:31:09 2019 -0400
Supported updating framework info in the scheduler driver.
This patch adds a method to the MesosSchedulerDriver which updates the
FrameworkInfo (by making an UPDATE_FRAMEWORK call to the master and also
updating the FrameworkInfo stored in the driver for the purposes of
(re-)subscribing).
Review: https://reviews.apache.org/r/70752/
---
include/mesos/scheduler.hpp | 13 ++++++
src/sched/sched.cpp | 111 ++++++++++++++++++++++++++++++++++++++------
2 files changed, 110 insertions(+), 14 deletions(-)
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ea7cdf..c5e61d4 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -317,6 +317,17 @@ public:
// currently known.
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses) = 0;
+
+ // Updates the FrameworkInfo with the provided value (except for the
+ // framework_id field, which should not be set by the caller).
+ // The driver implementation should send the supplied FrameworkInfo update
+ // to the master. Also, all the next re-registration attempts will be
+ // performed with the provided FrameworkInfo.
+ //
+ // NOTE: If the supplied info is invalid or fails authorization,
+ // the `error()` callback will be invoked asynchronously (after
+ // the master replies with a FrameworkErrorMessage).
+ virtual Status updateFramework(const FrameworkInfo& frameworkInfo) = 0;
};
@@ -454,6 +465,8 @@ public:
Status reconcileTasks(
const std::vector<TaskStatus>& statuses) override;
+ Status updateFramework(const FrameworkInfo& frameworkInfo) override;
+
protected:
// Used to detect (i.e., choose) the master.
std::shared_ptr<master::detector::MasterDetector> detector;
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index e77a029..281236b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -218,6 +218,7 @@ public:
latch(_latch),
failover(_framework.has_id() && !framework.id().value().empty()),
connected(false),
+ sendUpdateFrameworkOnConnect(false),
running(true),
detector(_detector),
flags(_flags),
@@ -748,6 +749,11 @@ protected:
connected = true;
failover = false;
+ if (sendUpdateFrameworkOnConnect) {
+ sendUpdateFramework();
+ }
+ sendUpdateFrameworkOnConnect = false;
+
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
@@ -790,6 +796,12 @@ protected:
connected = true;
failover = false;
+ if (sendUpdateFrameworkOnConnect) {
+ sendUpdateFramework();
+ }
+ sendUpdateFrameworkOnConnect = false;
+
+
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
@@ -1583,6 +1595,24 @@ protected:
send(master->pid(), call);
}
+ void updateFramework(const FrameworkInfo& framework_)
+ {
+ CHECK(!framework_.has_id());
+
+ // Update the FrameworkInfo used for re-registration
+ FrameworkID frameworkId = framework.id();
+ framework = framework_;
+ *framework.mutable_id() = std::move(frameworkId);
+
+ if (connected) {
+ sendUpdateFramework();
+ } else {
+ VLOG(1) << "Postponing UPDATE_FRAMEWORK call:"
+ " not registered with master";
+ sendUpdateFrameworkOnConnect = true;
+ }
+ }
+
private:
friend class mesos::MesosSchedulerDriver;
@@ -1626,6 +1656,23 @@ private:
return static_cast<double>(eventCount<DispatchEvent>());
}
+ void sendUpdateFramework()
+ {
+ Call call;
+
+ CHECK(framework.has_id());
+ *call.mutable_framework_id() = framework.id();
+
+ call.set_type(Call::UPDATE_FRAMEWORK);
+ *call.mutable_update_framework()->mutable_framework_info() = framework;
+
+ VLOG(1) << "Sending UPDATE_FRAMEWORK message";
+
+ CHECK_SOME(master);
+ send(master->pid(), call);
+ }
+
+
MesosSchedulerDriver* driver;
Scheduler* scheduler;
FrameworkInfo framework;
@@ -1638,6 +1685,10 @@ private:
bool connected; // Flag to indicate if framework is registered.
+ // Flag to indicate that an UPDATE_FRAMEWORK with a current FrameworkInfo
+ // should be sent after successful (re)connection attempt.
+ bool sendUpdateFrameworkOnConnect;
+
// TODO(vinod): Instead of 'bool' use 'Status'.
// We set 'running' to false in SchedulerDriver::stop() and
// SchedulerDriver::abort() to prevent any further messages from
@@ -1686,6 +1737,25 @@ private:
} // namespace mesos {
+void fillMissingFrameworkInfoFields(FrameworkInfo* framework)
+{
+ if (framework->user().empty()) {
+ Result<string> user = os::user();
+ CHECK_SOME(user);
+
+ *framework->mutable_user() = user.get();
+ }
+
+ if (framework->hostname().empty()) {
+ Try<string> hostname = net::hostname();
+
+ if (hostname.isSome()) {
+ *framework->mutable_hostname() = hostname.get();
+ }
+ }
+}
+
+
void MesosSchedulerDriver::initialize() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -1741,20 +1811,7 @@ void MesosSchedulerDriver::initialize() {
// see if the current user can switch to that user, or via an
// authentication module ensure this is acceptable.
- // See FrameWorkInfo in include/mesos/mesos.proto:
- if (framework.user().empty()) {
- Result<string> user = os::user();
- CHECK_SOME(user);
-
- framework.set_user(user.get());
- }
-
- if (framework.hostname().empty()) {
- Try<string> hostname = net::hostname();
- if (hostname.isSome()) {
- framework.set_hostname(hostname.get());
- }
- }
+ fillMissingFrameworkInfoFields(&framework);
// Launch a local cluster if necessary.
Option<UPID> pid;
@@ -2275,6 +2332,32 @@ Status MesosSchedulerDriver::reconcileTasks(
}
}
+Status MesosSchedulerDriver::updateFramework(const FrameworkInfo& update)
+{
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
+
+ if (update.has_id()) {
+ LOG(ERROR) << "MesosSchedulerDriver::updateFramework should not be
called"
+ << " with 'FrameworkInfo.id' set, aborting driver";
+ abort();
+ return status;
+ }
+
+ framework = update;
+
+ fillMissingFrameworkInfoFields(&framework);
+
+ CHECK(process != nullptr);
+
+ dispatch(process, &SchedulerProcess::updateFramework, framework);
+
+ return status;
+ }
+}
+
Status MesosSchedulerDriver::requestResources(
const vector<Request>& requests)