This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit fcfa3271c0c4fc2d8c6ae7cfd88cd82d26d2e1d1 Author: Andrei Sekretenko <[email protected]> AuthorDate: Wed Sep 16 12:25:19 2020 +0200 Added offer constraints to `MesosSchedulerDriver::updateFramework()`. This patch adds an ability to set V0 framework's offer constraints via the C++ V0 scheduler driver. Review: https://reviews.apache.org/r/72874 --- include/mesos/scheduler.hpp | 22 ++++++++++++------- .../jni/org_apache_mesos_MesosSchedulerDriver.cpp | 3 ++- .../scheduler/mesos_scheduler_driver_impl.cpp | 4 +++- src/sched/sched.cpp | 25 ++++++++++++++++------ src/tests/master/update_framework_tests.cpp | 18 ++++++++-------- src/tests/scheduler_driver_tests.cpp | 2 +- 6 files changed, 48 insertions(+), 26 deletions(-) diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp index 61cc846..1401c34 100644 --- a/include/mesos/scheduler.hpp +++ b/include/mesos/scheduler.hpp @@ -23,6 +23,7 @@ #include <vector> #include <mesos/mesos.hpp> +#include <mesos/scheduler/scheduler.pb.h> // Mesos scheduler interface and scheduler driver. A scheduler is used // to interact with Mesos in order to run distributed computations. @@ -344,14 +345,16 @@ public: virtual Status reconcileTasks( const std::vector<TaskStatus>& statuses) = 0; - // Inform Mesos master about changes to the `FrameworkInfo` and - // the set of suppressed roles. The driver will store the new - // `FrameworkInfo` and the new set of suppressed roles, and all - // subsequent re-registrations will use them. + // Requests Mesos master to change the `FrameworkInfo`, the set of suppressed + // roles and the offer constraints. The driver will store the new + // `FrameworkInfo`, the new set of suppressed roles and the new offer + // constraints, and all subsequent re-registrations will use them. // // NOTE: If the supplied info is invalid or fails authorization, - // the `error()` callback will be invoked asynchronously (after - // the master replies with a `FrameworkErrorMessage`). + // or the supplied offer constraints are not valid, the `error()` callback + // will be invoked asynchronously (after the master replies with a + // `FrameworkErrorMessage`). Note that validity of non-empty (i.e. + // not default-constructed) offer constraints may depend on master flags. // // NOTE: This must be called after initial registration with the // master completes and the `FrameworkID` is assigned. The assigned @@ -362,7 +365,8 @@ public: // during driver initialization. virtual Status updateFramework( const FrameworkInfo& frameworkInfo, - const std::vector<std::string>& suppressedRoles) = 0; + const std::vector<std::string>& suppressedRoles, + ::mesos::scheduler::OfferConstraints&& offerConstraints) = 0; }; @@ -524,7 +528,9 @@ public: Status updateFramework( const FrameworkInfo& frameworkInfo, - const std::vector<std::string>& suppressedRoles) override; + const std::vector<std::string>& suppressedRoles, + ::mesos::scheduler::OfferConstraints&& offerConstraints) + override; protected: // Used to detect (i.e., choose) the master. diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp index 1817bba..4d71765 100644 --- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp +++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp @@ -1086,7 +1086,8 @@ Java_org_apache_mesos_MesosSchedulerDriver_updateFramework( MesosSchedulerDriver* driver = (MesosSchedulerDriver*) env->GetLongField(thiz, __driver); - Status status = driver->updateFramework(frameworkInfo, suppressedRoles); + Status status = driver->updateFramework( + frameworkInfo, suppressedRoles, ::mesos::scheduler::OfferConstraints()); return convert<Status>(env, status); } diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp index 256632a..17260cd 100644 --- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp +++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp @@ -859,7 +859,9 @@ PyObject* MesosSchedulerDriverImpl_updateFramework( return nullptr; } - Status status = self->driver->updateFramework(framework, *suppressedRoles); + Status status = self->driver->updateFramework( + framework, *suppressedRoles, ::mesos::scheduler::OfferConstraints{}); + return PyInt_FromLong(status); } diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 768ce7d..119a062 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -105,6 +105,8 @@ using namespace mesos::scheduler; using google::protobuf::RepeatedPtrField; +using mesos::scheduler::OfferConstraints; + using mesos::master::detector::MasterDetector; using process::Clock; @@ -839,6 +841,7 @@ protected: Call::Subscribe* subscribe = call.mutable_subscribe(); subscribe->mutable_framework_info()->CopyFrom(framework); + *subscribe->mutable_offer_constraints() = offerConstraints; *subscribe->mutable_suppressed_roles() = RepeatedPtrField<string>( suppressedRoles.begin(), suppressedRoles.end()); @@ -1645,7 +1648,8 @@ protected: void updateFramework( const FrameworkInfo& framework_, - set<string>&& suppressedRoles_) + set<string>&& suppressedRoles_, + OfferConstraints&& offerConstraints_) { if (!framework.has_id() || framework.id().value().empty()) { error("MesosSchedulerDriver::updateFramework() must not be called" @@ -1664,6 +1668,7 @@ protected: framework = framework_; suppressedRoles = std::move(suppressedRoles_); + offerConstraints = std::move(offerConstraints_); if (connected) { sendUpdateFramework(); @@ -1725,8 +1730,10 @@ private: *call.mutable_framework_id() = framework.id(); call.set_type(Call::UPDATE_FRAMEWORK); - *call.mutable_update_framework()->mutable_framework_info() = framework; - *call.mutable_update_framework()->mutable_suppressed_roles() = + Call::UpdateFramework* updateFramework = call.mutable_update_framework(); + *updateFramework->mutable_framework_info() = framework; + *updateFramework->mutable_offer_constraints() = offerConstraints; + *updateFramework->mutable_suppressed_roles() = RepeatedPtrField<string>(suppressedRoles.begin(), suppressedRoles.end()); VLOG(1) << "Sending UPDATE_FRAMEWORK message"; @@ -1740,6 +1747,7 @@ private: Scheduler* scheduler; FrameworkInfo framework; set<string> suppressedRoles; + OfferConstraints offerConstraints; std::recursive_mutex* mutex; Latch* latch; @@ -2476,7 +2484,8 @@ Status MesosSchedulerDriver::reconcileTasks( Status MesosSchedulerDriver::updateFramework( const FrameworkInfo& update, - const vector<string>& suppressedRoles_) + const vector<string>& suppressedRoles_, + OfferConstraints&& offerConstraints_) { synchronized (mutex) { if (status != DRIVER_RUNNING) { @@ -2497,8 +2506,12 @@ Status MesosSchedulerDriver::updateFramework( << " " << suppressedRoles_.size() - suppressedRoles.size() << " duplicates " << suppressedRoles_; - dispatch(process, &SchedulerProcess::updateFramework, framework, - std::move(suppressedRoles)); + dispatch( + process, + &SchedulerProcess::updateFramework, + framework, + std::move(suppressedRoles), + std::move(offerConstraints_)); return status; } diff --git a/src/tests/master/update_framework_tests.cpp b/src/tests/master/update_framework_tests.cpp index d6c45f6..3f86573 100644 --- a/src/tests/master/update_framework_tests.cpp +++ b/src/tests/master/update_framework_tests.cpp @@ -1003,7 +1003,7 @@ TEST_F(UpdateFrameworkV0Test, DriverErrorWhenCalledBeforeRegistration) driver.start(); - driver.updateFramework(DEFAULT_FRAMEWORK_INFO, {}); + driver.updateFramework(DEFAULT_FRAMEWORK_INFO, {}, {}); AWAIT_READY(error); EXPECT_EQ(error.get(), @@ -1042,7 +1042,7 @@ TEST_F(UpdateFrameworkV0Test, DriverErrorOnFrameworkIDMismatch) *update.mutable_id() = frameworkId.get(); *update.mutable_id()->mutable_value() += "-deadbeef"; - driver.updateFramework(update, {}); + driver.updateFramework(update, {}, {}); AWAIT_READY(error); EXPECT_EQ( @@ -1084,7 +1084,7 @@ TEST_F(UpdateFrameworkV0Test, CheckpointingChangeFails) FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO); update.set_checkpoint(!update.checkpoint()); *update.mutable_id() = frameworkId.get(); - driver.updateFramework(update, {}); + driver.updateFramework(update, {}, {}); AWAIT_READY(error); EXPECT_TRUE(strings::contains( @@ -1142,7 +1142,7 @@ TEST_F(UpdateFrameworkV0Test, MutableFieldsUpdateSuccessfully) FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO); *update.mutable_id() = frameworkId.get(); - driver.updateFramework(update, {}); + driver.updateFramework(update, {}, {}); AWAIT_READY(updateFrameworkMessage); @@ -1235,7 +1235,7 @@ TEST_F(UpdateFrameworkV0Test, OffersOnAddingRole) update.add_roles("new_role"); *update.mutable_id() = frameworkId.get(); - driver.updateFramework(update, {}); + driver.updateFramework(update, {}, {}); AWAIT_READY(offers); @@ -1295,7 +1295,7 @@ TEST_F(UpdateFrameworkV0Test, RescindOnRemovingRoles) update.clear_roles(); *update.mutable_id() = frameworkId.get(); - driver.updateFramework(update, {}); + driver.updateFramework(update, {}, {}); AWAIT_READY(rescindedOfferId); AWAIT_READY(recoverResources); @@ -1366,7 +1366,7 @@ TEST_F(UpdateFrameworkV0Test, SuppressedRoles) vector<string> suppressedRoles( update.roles().begin(), update.roles().end()); - driver.updateFramework(update, suppressedRoles); + driver.updateFramework(update, suppressedRoles, {}); // Ensure that the allocator processes the update, so that this test // does not rely on Master maintaining an ordering between scheduler API calls @@ -1442,8 +1442,8 @@ TEST_F(UpdateFrameworkV0Test, UnsuppressClearsFilters) vector<string> suppressedRoles( update.roles().begin(), update.roles().end()); - driver.updateFramework(update, suppressedRoles); - driver.updateFramework(update, {}); + driver.updateFramework(update, suppressedRoles, {}); + driver.updateFramework(update, {}, {}); // Now the previously declined agent should be re-offered. Clock::pause(); diff --git a/src/tests/scheduler_driver_tests.cpp b/src/tests/scheduler_driver_tests.cpp index 63d7a3b..b45c367 100644 --- a/src/tests/scheduler_driver_tests.cpp +++ b/src/tests/scheduler_driver_tests.cpp @@ -692,7 +692,7 @@ TEST_F(MesosSchedulerDriverTest, ReviveSingleRole) // In addition to setting a filter, suppress role2 (to check that // reviveOffers() not only removes filters, but also unsuppresses roles). *frameworkInfo.mutable_id() = frameworkId.get(); - driver.updateFramework(frameworkInfo, {"role2"}); + driver.updateFramework(frameworkInfo, {"role2"}, {}); // Wait for updateFramework() to be dispatched to the allocator. // Otherwise, REVIVE might be processed by the allocator before the update.
