This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit aebf86f97a77da233b523eb7381bf1132a90410c Author: Andrei Sekretenko <[email protected]> AuthorDate: Wed Jul 24 11:02:04 2019 -0400 Added enchanced multi-role capability support to the python bindings. This patch adds suppressed roles list to the arguments of the scheduler driver constructor and `revivieOffers()`/`suppressOffers` methods, and also adds support of the `updateFramework()` method. Review: https://reviews.apache.org/r/71083/ --- .../interface/src/mesos/interface/__init__.py | 53 +++++++--- .../scheduler/mesos_scheduler_driver_impl.cpp | 108 ++++++++++++++++++--- .../scheduler/mesos_scheduler_driver_impl.hpp | 11 ++- 3 files changed, 143 insertions(+), 29 deletions(-) diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py index 1200ef6..f986185 100644 --- a/src/python/interface/src/mesos/interface/__init__.py +++ b/src/python/interface/src/mesos/interface/__init__.py @@ -238,28 +238,29 @@ class SchedulerDriver(object): callback. """ - def reviveOffers(self): + def reviveOffers(self, roles=None): """ - Removes all filters previously set by the framework (via launchTasks() - or declineOffer()) and clears the set of suppressed roles. - - NOTE: If the framework is not connected to the master, the set - of suppressed roles stored by the driver will be cleared, and an - up-to-date set of suppressed roles will be sent to the master + Removes filters either for all roles of the framework (if 'roles' + is None) or for the specified roles and removes these roles from + the suppressed set. If the framework is not connected to the master, + an up-to-date set of suppressed roles will be sent to the master during re-registration. + + NOTE: If 'roles' is an empty iterable, this method does nothing. """ - def suppressOffers(self): + def suppressOffers(self, roles=None): """ - Informs Mesos master to stop sending offers to the framework (i.e. - to suppress all roles of the framework). To resume getting offers, - the scheduler can call reviveOffers() or set the suppressed roles - explicitly via updateFramework(). + Informs Mesos master to stop sending offers either for all roles + of the framework (if 'roles' is None) or for the specified 'roles' + of the framework (i.e. to suppress these roles). To resume getting + offers, the scheduler can call reviveOffers() or set the suppressed + roles explicitly via updateFramework(). + + NOTE: If the framework is not connected to the master, an up-to-date set + of suppressed roles will be sent to the master during re-registration. - NOTE: If the framework is not connected to the master, all the roles - will be added to the set of suppressed roles in the driver, and an - up-to-date suppressed roles set will be sent to the master during - re-registration. + NOTE: If `roles` is an empty iterable, this method does nothing. """ def acknowledgeStatusUpdate(self, status): @@ -288,6 +289,26 @@ class SchedulerDriver(object): currently known. """ + def updateFramework(self, frameworkInfo, suppressedRoles): + """ + Inform Mesos master about changes to the `FrameworkInfo` and + the set of suppressed roles. The driver will store the new + `FrameworkInfo` and the new set of suppressed roles, and all + subsequent re-registrations will use them. + + NOTE: If the supplied info is invalid or fails authorization, + the `error()` callback will be invoked asynchronously (after + the master replies with a `FrameworkErrorMessage`). + + NOTE: This must be called after initial registration with the + master completes and the `FrameworkID` is assigned. The assigned + `FrameworkID` must be set in `frameworkInfo`. + + NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname` + fields will be auto-populated using the same approach used + during driver initialization. + """ + class Executor(object): """ Base class for Mesos executors. Users' executors should extend this diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp index 6dec9da..256632a 100644 --- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp +++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp @@ -32,6 +32,7 @@ using std::endl; using std::string; using std::vector; using std::map; +using std::unique_ptr; namespace mesos { namespace python { @@ -138,13 +139,13 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = { }, { "reviveOffers", (PyCFunction) MesosSchedulerDriverImpl_reviveOffers, - METH_NOARGS, - "Remove all filters and ask Mesos for new offers" + METH_VARARGS, + "Remove all filters, unsuppress and ask Mesos for new offers for the roles" }, { "suppressOffers", (PyCFunction) MesosSchedulerDriverImpl_suppressOffers, - METH_NOARGS, - "Set suppressed attribute as true for the Framework" + METH_VARARGS, + "Set suppressed roles for the Framework" }, { "acknowledgeStatusUpdate", (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate, @@ -161,6 +162,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = { METH_VARARGS, "Master sends status updates if task status is different from expected" }, + { "updateFramework", + (PyCFunction) MesosSchedulerDriverImpl_updateFramework, + METH_VARARGS, + "Updates FrameworkInfo and suppressed roles" + }, { nullptr } /* Sentinel */ }; @@ -198,15 +204,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self, const char* master; int implicitAcknowledgements = 1; // Enabled by default. PyObject* credentialObj = nullptr; + PyObject* suppressedRolesObj = nullptr; if (!PyArg_ParseTuple( args, - "OOs|iO", + "OOs|iOO", &schedulerObj, &frameworkObj, &master, &implicitAcknowledgements, - &credentialObj)) { + &credentialObj, + &suppressedRolesObj)) { return -1; } @@ -234,6 +242,14 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self, } } + unique_ptr<vector<string>> suppressedRoles; + if (suppressedRolesObj != nullptr && suppressedRolesObj != Py_None) { + suppressedRoles = constructFromIterable<string>(suppressedRolesObj); + if (!suppressedRoles) { + // Exception has been set by constructFromIterable + return -1; + } + } if (self->driver != nullptr) { delete self->driver; @@ -251,6 +267,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self, self->driver = new MesosSchedulerDriver( self->proxyScheduler, framework, + suppressedRoles ? *suppressedRoles : vector<string>{}, master, implicitAcknowledgements != 0, credential); @@ -258,6 +275,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self, self->driver = new MesosSchedulerDriver( self->proxyScheduler, framework, + suppressedRoles ? *suppressedRoles : vector<string>{}, master, implicitAcknowledgements != 0); } @@ -645,27 +663,64 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self, } -PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self) +PyObject* MesosSchedulerDriverImpl_reviveOffers( + MesosSchedulerDriverImpl* self, + PyObject* args) { + PyObject* rolesObj = nullptr; + if (!PyArg_ParseTuple(args, "|O", &rolesObj)) { + return nullptr; + } + if (self->driver == nullptr) { PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr"); return nullptr; } - Status status = self->driver->reviveOffers(); - return PyInt_FromLong(status); // Sets exception if creating long fails. + Status status; + + if (rolesObj == nullptr || rolesObj == Py_None) { + status = self->driver->reviveOffers(); + } else { + unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj); + if (!roles) { + return nullptr; + } + + status = self->driver->reviveOffers(*roles); + } + + return PyInt_FromLong(status); } PyObject* MesosSchedulerDriverImpl_suppressOffers( - MesosSchedulerDriverImpl* self) + MesosSchedulerDriverImpl* self, + PyObject* args) { + PyObject* rolesObj = nullptr; + if (!PyArg_ParseTuple(args, "|O", &rolesObj)) { + return nullptr; + } + if (self->driver == nullptr) { PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr"); return nullptr; } - Status status = self->driver->suppressOffers(); + Status status; + + if (rolesObj == nullptr || rolesObj == Py_None) { + status = self->driver->suppressOffers(); + } else { + unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj); + if (!roles) { + return nullptr; + } + + status = self->driver->suppressOffers(*roles); + } + return PyInt_FromLong(status); // Sets exception if creating long fails. } @@ -778,5 +833,36 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks( return PyInt_FromLong(status); } + +PyObject* MesosSchedulerDriverImpl_updateFramework( + MesosSchedulerDriverImpl* self, + PyObject* args) +{ + PyObject* frameworkObj = nullptr; + PyObject* suppressedRolesObj = nullptr; + + if (!PyArg_ParseTuple(args, "OO", &frameworkObj, &suppressedRolesObj)) { + return nullptr; + } + + FrameworkInfo framework; + if (!readPythonProtobuf(frameworkObj, &framework)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python FrameworkInfo"); + return nullptr; + } + + unique_ptr<vector<string>> suppressedRoles; + suppressedRoles = constructFromIterable<string>(suppressedRolesObj); + if (!suppressedRoles) { + // Exception has been set by constructFromIterable + return nullptr; + } + + Status status = self->driver->updateFramework(framework, *suppressedRoles); + return PyInt_FromLong(status); +} + + } // namespace python { } // namespace mesos { diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp index 8c98d46..7bc0856 100644 --- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp +++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp @@ -111,10 +111,13 @@ PyObject* MesosSchedulerDriverImpl_declineOffer( MesosSchedulerDriverImpl* self, PyObject* args); -PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self); +PyObject* MesosSchedulerDriverImpl_reviveOffers( + MesosSchedulerDriverImpl* self, + PyObject* pyRoles); PyObject* MesosSchedulerDriverImpl_suppressOffers( - MesosSchedulerDriverImpl* self); + MesosSchedulerDriverImpl* self, + PyObject* pyRoles); PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate( MesosSchedulerDriverImpl* self, @@ -128,6 +131,10 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks( MesosSchedulerDriverImpl* self, PyObject* args); +PyObject* MesosSchedulerDriverImpl_updateFramework( + MesosSchedulerDriverImpl* self, + PyObject* args); + } // namespace python { } // namespace mesos {
