Repository: mesos Updated Branches: refs/heads/master 2f92699e1 -> a0fd3491e
Maintenance Primitives: Accept/Decline responses in maintenance/status. Adds a `getInverseOfferStatuses` method to the allocator, which returns some `InverseOfferStatus` objects, grouped by Agent and Framework. Changes the `/maintenance/status` endpoint to return this additional information about draining machines. Review: https://reviews.apache.org/r/38653 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0fd3491 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0fd3491 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0fd3491 Branch: refs/heads/master Commit: a0fd3491e2408119f79f7a98e613c2f5ea99c115 Parents: 2f92699 Author: Joseph Wu <[email protected]> Authored: Wed Sep 23 16:47:19 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Wed Sep 23 17:18:58 2015 -0400 ---------------------------------------------------------------------- include/mesos/maintenance/maintenance.proto | 13 ++++- include/mesos/master/allocator.hpp | 5 ++ include/mesos/master/allocator.proto | 6 ++ src/common/protobuf_utils.cpp | 8 +++ src/common/protobuf_utils.hpp | 4 ++ src/master/allocator/mesos/allocator.hpp | 20 +++++++ src/master/allocator/mesos/hierarchical.hpp | 27 +++++++++ src/master/http.cpp | 71 +++++++++++++++++------- src/master/master.cpp | 4 ++ src/tests/master_maintenance_tests.cpp | 70 ++++++++++++++++++++++- src/tests/mesos.hpp | 16 ++++++ 11 files changed, 222 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/maintenance/maintenance.proto ---------------------------------------------------------------------- diff --git a/include/mesos/maintenance/maintenance.proto b/include/mesos/maintenance/maintenance.proto index ee01c5d..aaca251 100644 --- a/include/mesos/maintenance/maintenance.proto +++ b/include/mesos/maintenance/maintenance.proto @@ -17,6 +17,7 @@ */ import "mesos/mesos.proto"; +import "mesos/master/allocator.proto"; package mesos.maintenance; @@ -69,9 +70,17 @@ message Schedule { /** * Represents the maintenance status of each machine in the cluster. - * Corresponds to the `MachineInfo.Mode` enumeration. + * The lists correspond to the `MachineInfo.Mode` enumeration. */ message ClusterStatus { - repeated MachineID draining_machines = 1; + message DrainingMachine { + required MachineID id = 1; + + // A list of the most recent responses to inverse offers from frameworks + // running on this draining machine. + repeated master.InverseOfferStatus statuses = 2; + } + + repeated DrainingMachine draining_machines = 1; repeated MachineID down_machines = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp index 2fd00ca..8100f14 100644 --- a/include/mesos/master/allocator.hpp +++ b/include/mesos/master/allocator.hpp @@ -163,6 +163,11 @@ public: const Option<InverseOfferStatus>& status, const Option<Filters>& filters = None()) = 0; + // Retrieves the status of all inverse offers maintained by the allocator. + virtual process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> + getInverseOfferStatuses() = 0; + // Informs the Allocator to recover resources that are considered // used by the framework. virtual void recoverResources( http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.proto ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.proto b/include/mesos/master/allocator.proto index b42f19d..224da71 100644 --- a/include/mesos/master/allocator.proto +++ b/include/mesos/master/allocator.proto @@ -16,6 +16,8 @@ * limitations under the License. */ +import "mesos/mesos.proto"; + package mesos.master; @@ -54,6 +56,10 @@ message InverseOfferStatus { } required Status status = 1; + required FrameworkID framework_id = 2; + + // Time, since the epoch, when this status was last updated. + required TimeInfo timestamp = 3; // TODO(jmlvanre): Capture decline message. } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index 4dc58fe..c1e8e01 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -221,6 +221,14 @@ Label createLabel(const std::string& key, const std::string& value) return label; } + +TimeInfo getCurrentTime() +{ + TimeInfo timeInfo; + timeInfo.set_nanoseconds(process::Clock::now().duration().ns()); + return timeInfo; +} + namespace slave { ContainerLimitation createContainerLimitation( http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 3817c6a..8793851 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -84,6 +84,10 @@ MasterInfo createMasterInfo(const process::UPID& pid); Label createLabel(const std::string& key, const std::string& value); + +// Helper function that fills in a TimeInfo from the current time. +TimeInfo getCurrentTime(); + namespace slave { mesos::slave::ContainerLimitation createContainerLimitation( http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index dca2565..c5375aa 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -25,6 +25,7 @@ #include <process/future.hpp> #include <process/process.hpp> +#include <stout/hashmap.hpp> #include <stout/try.hpp> namespace mesos { @@ -123,6 +124,10 @@ public: const Option<mesos::master::InverseOfferStatus>& status, const Option<Filters>& filters); + process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> + getInverseOfferStatuses(); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -232,6 +237,10 @@ public: const Option<mesos::master::InverseOfferStatus>& status, const Option<Filters>& filters = None()) = 0; + virtual process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> + getInverseOfferStatuses() = 0; + virtual void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -506,6 +515,17 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer( template <typename AllocatorProcess> +inline process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> + MesosAllocator<AllocatorProcess>::getInverseOfferStatuses() +{ + return process::dispatch( + process, + &MesosAllocatorProcess::getInverseOfferStatuses); +} + + +template <typename AllocatorProcess> inline void MesosAllocator<AllocatorProcess>::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 4ec08fd..f3a9b9d 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -161,6 +161,10 @@ public: const Option<mesos::master::InverseOfferStatus>& status, const Option<Filters>& filters); + process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> + getInverseOfferStatuses(); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -1031,6 +1035,29 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer( template <class RoleSorter, class FrameworkSorter> +process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> +HierarchicalAllocatorProcess< + RoleSorter, FrameworkSorter>::getInverseOfferStatuses() +{ + CHECK(initialized); + + hashmap< + SlaveID, + hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result; + + // Make a copy of the most recent statuses. + foreachpair (const SlaveID& id, const Slave& slave, slaves) { + if (slave.maintenance.isSome()) { + result[id] = slave.maintenance.get().statuses; + } + } + + return result; +} + + +template <class RoleSorter, class FrameworkSorter> void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources( const FrameworkID& frameworkId, http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index fb5315c..a92c276 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -62,6 +62,7 @@ #include "logging/logging.hpp" +#include "master/machine.hpp" #include "master/maintenance.hpp" #include "master/master.hpp" #include "master/validation.hpp" @@ -1749,7 +1750,11 @@ const string Master::Http::MAINTENANCE_STATUS_HELP = HELP( TLDR( "Retrieves the maintenance status of the cluster."), DESCRIPTION( - "Returns an object with one list of machines per machine mode.")); + "Returns an object with one list of machines per machine mode.", + "For draining machines, this list includes the frameworks' responses", + "to inverse offers. NOTE: Inverse offer responses are cleared if", + "the master fails over. However, new inverse offers will be sent", + "once the master recovers.")); // /master/maintenance/status endpoint handler. @@ -1759,27 +1764,55 @@ Future<Response> Master::Http::maintenanceStatus(const Request& request) const return BadRequest("Expecting GET, got '" + request.method + "'"); } - // Unwrap the master's machine information into two arrays of machines. - mesos::maintenance::ClusterStatus status; - foreachkey (const MachineID& id, master->machines) { - switch (master->machines[id].info.mode()) { - case MachineInfo::DRAINING: { - status.add_draining_machines()->CopyFrom(id); - break; - } - case MachineInfo::DOWN: { - status.add_down_machines()->CopyFrom(id); - break; - } - // Currently, `UP` machines are not specifically tracked in the master. - case MachineInfo::UP: {} - default: { - break; + return master->allocator->getInverseOfferStatuses() + .then(defer( + master->self(), + [=]( + hashmap< + SlaveID, + hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result) + -> Future<Response> { + + // Unwrap the master's machine information into two arrays of machines. + // The data is coming from the allocator and therefore could be stale. + // Also, if the master fails over, this data is cleared. + mesos::maintenance::ClusterStatus status; + foreachpair (const MachineID& id, const Machine& machine, master->machines) { + switch (machine.info.mode()) { + case MachineInfo::DRAINING: { + mesos::maintenance::ClusterStatus::DrainingMachine* drainingMachine = + status.add_draining_machines(); + + drainingMachine->mutable_id()->CopyFrom(id); + + // Unwrap inverse offer status information from the allocator. + foreach (const SlaveID& slave, machine.slaves) { + if (result.contains(slave)) { + foreachvalue ( + const mesos::master::InverseOfferStatus& status, + result[slave]) { + drainingMachine->add_statuses()->CopyFrom(status); + } + } + } + break; + } + + case MachineInfo::DOWN: { + status.add_down_machines()->CopyFrom(id); + break; + } + + // Currently, `UP` machines are not specifically tracked in the master. + case MachineInfo::UP: {} + default: { + break; + } } } - } - return OK(JSON::Protobuf(status), request.query.get("jsonp")); + return OK(JSON::Protobuf(status), request.query.get("jsonp")); + })); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 90ef8c6..5ca1941 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2843,6 +2843,8 @@ void Master::accept( if (inverseOffer != NULL) { mesos::master::InverseOfferStatus status; status.set_status(mesos::master::InverseOfferStatus::ACCEPT); + status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id()); + status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime()); allocator->updateInverseOffer( inverseOffer->slave_id(), @@ -3318,6 +3320,8 @@ void Master::decline( if (inverseOffer != NULL) { // If this is an inverse offer. mesos::master::InverseOfferStatus status; status.set_status(mesos::master::InverseOfferStatus::DECLINE); + status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id()); + status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime()); allocator->updateInverseOffer( inverseOffer->slave_id(), http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/master_maintenance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp index c5277a1..89ad138 100644 --- a/src/tests/master_maintenance_tests.cpp +++ b/src/tests/master_maintenance_tests.cpp @@ -1025,11 +1025,13 @@ TEST_F(MasterMaintenanceTest, MachineStatus) ASSERT_SOME(statuses); ASSERT_EQ(1, statuses.get().draining_machines().size()); ASSERT_EQ(0, statuses.get().down_machines().size()); - ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).ip()); + ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).id().ip()); } // Test ensures that accept and decline works with inverse offers. +// And that accepted/declined inverse offers will be reflected +// in the maintenance status endpoint. TEST_F(MasterMaintenanceTest, InverseOffers) { // Set up a master. @@ -1061,6 +1063,24 @@ TEST_F(MasterMaintenanceTest, InverseOffers) AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + // Sanity check that this machine shows up in the status endpoint + // and there should be no inverse offer status. + response = process::http::get(master.get(), "maintenance/status"); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + Try<JSON::Object> statuses_ = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(statuses_); + Try<maintenance::ClusterStatus> statuses = + ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get()); + + ASSERT_SOME(statuses); + ASSERT_EQ(0, statuses.get().down_machines().size()); + ASSERT_EQ(1, statuses.get().draining_machines().size()); + ASSERT_EQ( + maintenanceHostname, + statuses.get().draining_machines(0).id().hostname()); + ASSERT_EQ(0, statuses.get().draining_machines(0).statuses().size()); + // Now start a framework. Callbacks callbacks; @@ -1194,6 +1214,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers) EXPECT_EQ(1, event.get().offers().inverse_offers().size()); inverseOffer = event.get().offers().inverse_offers(0); + // Check that the status endpoint shows the inverse offer as declined. + response = process::http::get(master.get(), "maintenance/status"); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + statuses_ = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(statuses_); + statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get()); + + ASSERT_SOME(statuses); + ASSERT_EQ(0, statuses.get().down_machines().size()); + ASSERT_EQ(1, statuses.get().draining_machines().size()); + ASSERT_EQ( + maintenanceHostname, + statuses.get().draining_machines(0).id().hostname()); + + ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size()); + ASSERT_EQ( + mesos::master::InverseOfferStatus::DECLINE, + statuses.get().draining_machines(0).statuses(0).status()); + + ASSERT_EQ( + id, + evolve(statuses.get().draining_machines(0).statuses(0).framework_id())); + { // Accept an inverse offer, with filter. Call call; @@ -1218,6 +1262,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers) EXPECT_EQ(0, event.get().offers().offers().size()); EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + // Check that the status endpoint shows the inverse offer as accepted. + response = process::http::get(master.get(), "maintenance/status"); + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + statuses_ = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(statuses_); + statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get()); + + ASSERT_SOME(statuses); + ASSERT_EQ(0, statuses.get().down_machines().size()); + ASSERT_EQ(1, statuses.get().draining_machines().size()); + ASSERT_EQ( + maintenanceHostname, + statuses.get().draining_machines(0).id().hostname()); + + ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size()); + ASSERT_EQ( + mesos::master::InverseOfferStatus::ACCEPT, + statuses.get().draining_machines(0).statuses(0).status()); + + ASSERT_EQ( + id, + evolve(statuses.get().draining_machines(0).statuses(0).framework_id())); + EXPECT_CALL(exec, shutdown(_)) .Times(AtMost(1)); http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index ff241cc..3e58b45 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1365,6 +1365,12 @@ ACTION_P(InvokeUpdateInverseOffer, allocator) } +ACTION_P(InvokeGetInverseOfferStatuses, allocator) +{ + return allocator->real->getInverseOfferStatuses(); +} + + ACTION_P(InvokeRecoverResources, allocator) { allocator->real->recoverResources(arg0, arg1, arg2, arg3); @@ -1504,6 +1510,11 @@ public: EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _)) .WillRepeatedly(DoDefault()); + ON_CALL(*this, getInverseOfferStatuses()) + .WillByDefault(InvokeGetInverseOfferStatuses(this)); + EXPECT_CALL(*this, getInverseOfferStatuses()) + .WillRepeatedly(DoDefault()); + ON_CALL(*this, recoverResources(_, _, _, _)) .WillByDefault(InvokeRecoverResources(this)); EXPECT_CALL(*this, recoverResources(_, _, _, _)) @@ -1597,6 +1608,11 @@ public: const Option<mesos::master::InverseOfferStatus>&, const Option<Filters>&)); + MOCK_METHOD0(getInverseOfferStatuses, process::Future< + hashmap<SlaveID, hashmap< + FrameworkID, + mesos::master::InverseOfferStatus>>>()); + MOCK_METHOD4(recoverResources, void( const FrameworkID&, const SlaveID&,
