Repository: mesos
Updated Branches:
refs/heads/master 3f862f332 -> 848767b4f
Sent resource version uuid only for agent default resources.
It's not correct to send resource version uuids for local resources
providers during agent re(registration) because the total resources from
those local resource providers are not sent in the same message.
Consider the following sequence of events:
(1) Agent disconnects
(2) Speculative operation fails in an RP, the RP bumps the version uuid
(3) Agent updates the RPâs resource version uuid
(4) Agent reregisters
(5) Master is informed about the new resource version uuid of that RP
(6) Master still has the old total of the RP
(7) Framework launch an operation assuming the old total, but with the
new resource version uuid
This patch updated the `RegisterSlaveMessage` and
`ReregisterSlaveMessage` to only send resource version uuids for the
agent default resources.
Review: https://reviews.apache.org/r/64494
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/848767b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/848767b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/848767b4
Branch: refs/heads/master
Commit: 848767b4f3b503944936905ca498dc77681cce24
Parents: c3157d7
Author: Jie Yu <[email protected]>
Authored: Sun Dec 10 11:42:54 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Mon Dec 11 14:02:00 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 64 +++++++++++++++++++++++++++++-----------
src/master/master.hpp | 16 ++++++----
src/messages/messages.proto | 46 +++++++++++++++--------------
src/slave/slave.cpp | 24 +++------------
src/tests/slave_tests.cpp | 16 +++++-----
5 files changed, 92 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 120cb75..b10d034 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6297,6 +6297,15 @@ void Master::__registerSlave(
vector<Resource> checkpointedResources = google::protobuf::convert(
std::move(*registerSlaveMessage.mutable_checkpointed_resources()));
+ Option<UUID> resourceVersion;
+ if (registerSlaveMessage.has_resource_version_uuid()) {
+ Try<UUID> uuid = UUID::fromBytes(
+ registerSlaveMessage.resource_version_uuid());
+
+ CHECK_SOME(uuid);
+ resourceVersion = uuid.get();
+ }
+
Slave* slave = new Slave(
this,
slaveInfo,
@@ -6306,8 +6315,7 @@ void Master::__registerSlave(
std::move(agentCapabilities),
Clock::now(),
std::move(checkpointedResources),
- protobuf::parseResourceVersions(
- registerSlaveMessage.resource_version_uuids()));
+ resourceVersion);
++metrics->slave_registrations;
@@ -6812,6 +6820,15 @@ void Master::__reregisterSlave(
vector<ExecutorInfo> executorInfos = google::protobuf::convert(
std::move(*reregisterSlaveMessage.mutable_executor_infos()));
+ Option<UUID> resourceVersion;
+ if (reregisterSlaveMessage.has_resource_version_uuid()) {
+ Try<UUID> uuid = UUID::fromBytes(
+ reregisterSlaveMessage.resource_version_uuid());
+
+ CHECK_SOME(uuid);
+ resourceVersion = uuid.get();
+ }
+
Slave* slave = new Slave(
this,
slaveInfo,
@@ -6821,8 +6838,7 @@ void Master::__reregisterSlave(
std::move(agentCapabilities),
Clock::now(),
std::move(checkpointedResources),
- protobuf::parseResourceVersions(
- reregisterSlaveMessage.resource_version_uuids()),
+ resourceVersion,
std::move(executorInfos),
std::move(recoveredTasks));
@@ -6945,11 +6961,21 @@ void Master::___reregisterSlave(
const string& version = reregisterSlaveMessage.version();
const vector<SlaveInfo::Capability> agentCapabilities =
google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
- const vector<ResourceVersionUUID> resourceVersions =
- google::protobuf::convert(reregisterSlaveMessage.resource_version_uuids());
- Try<Nothing> stateUpdated =
- slave->update(slaveInfo, version, agentCapabilities, resourceVersions);
+ Option<UUID> resourceVersion;
+ if (reregisterSlaveMessage.has_resource_version_uuid()) {
+ Try<UUID> uuid = UUID::fromBytes(
+ reregisterSlaveMessage.resource_version_uuid());
+
+ CHECK_SOME(uuid);
+ resourceVersion = uuid.get();
+ }
+
+ Try<Nothing> stateUpdated = slave->update(
+ slaveInfo,
+ version,
+ agentCapabilities,
+ resourceVersion);
// As of now, the only way `slave->update()` can fail is if the agent sent
// different checkpointed resources than it had before. A well-behaving
@@ -11274,7 +11300,7 @@ Slave::Slave(
vector<SlaveInfo::Capability> _capabilites,
const Time& _registeredTime,
vector<Resource> _checkpointedResources,
- hashmap<Option<ResourceProviderID>, UUID> _resourceVersions,
+ const Option<UUID>& resourceVersion,
vector<ExecutorInfo> executorInfos,
vector<Task> tasks)
: master(_master),
@@ -11288,8 +11314,7 @@ Slave::Slave(
connected(true),
active(true),
checkpointedResources(std::move(_checkpointedResources)),
- observer(nullptr),
- resourceVersions(std::move(_resourceVersions))
+ observer(nullptr)
{
CHECK(info.has_id());
@@ -11301,6 +11326,10 @@ Slave::Slave(
CHECK_SOME(resources);
totalResources = resources.get();
+ if (resourceVersion.isSome()) {
+ resourceVersions.put(None(), resourceVersion.get());
+ }
+
foreach (ExecutorInfo& executorInfo, executorInfos) {
CHECK(executorInfo.has_framework_id());
addExecutor(executorInfo.framework_id(), std::move(executorInfo));
@@ -11578,10 +11607,10 @@ void Slave::apply(const vector<ResourceConversion>&
conversions)
Try<Nothing> Slave::update(
- const SlaveInfo& _info,
- const string& _version,
- const vector<SlaveInfo::Capability>& _capabilities,
- const vector<ResourceVersionUUID>& _resourceVersions)
+ const SlaveInfo& _info,
+ const string& _version,
+ const vector<SlaveInfo::Capability>& _capabilities,
+ const Option<UUID>& resourceVersion)
{
Try<Resources> resources = applyCheckpointedResources(
_info.resources(),
@@ -11602,8 +11631,9 @@ Try<Nothing> Slave::update(
// re-registering in this case.
totalResources = resources.get();
- resourceVersions = protobuf::parseResourceVersions(
- {_resourceVersions.begin(), _resourceVersions.end()});
+ if (resourceVersion.isSome()) {
+ resourceVersions.put(None(), resourceVersion.get());
+ }
return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7411e0b..232cc37 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -124,7 +124,7 @@ struct Slave
std::vector<SlaveInfo::Capability> _capabilites,
const process::Time& _registeredTime,
std::vector<Resource> _checkpointedResources,
- hashmap<Option<ResourceProviderID>, UUID> _resourceVersions,
+ const Option<UUID>& resourceVersion,
std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
std::vector<Task> tasks = std::vector<Task>());
@@ -177,10 +177,10 @@ struct Slave
void apply(const std::vector<ResourceConversion>& conversions);
Try<Nothing> update(
- const SlaveInfo& info,
- const std::string& _version,
- const std::vector<SlaveInfo::Capability>& _capabilites,
- const std::vector<ResourceVersionUUID>& resourceVersions);
+ const SlaveInfo& info,
+ const std::string& _version,
+ const std::vector<SlaveInfo::Capability>& _capabilites,
+ const Option<UUID>& resourceVersion);
Master* const master;
const SlaveID id;
@@ -263,12 +263,16 @@ struct Slave
// persistent volumes, dynamic reservations, etc). These are either
// in use by a task/executor, or are available for use and will be
// re-offered to the framework.
+ // TODO(jieyu): `checkpointedResources` is only for agent default
+ // resources. Resources from resource providers are not included in
+ // this field. Consider removing this field.
Resources checkpointedResources;
// The current total resources of the slave. Note that this is
// different from 'info.resources()' because this also considers
// operations (e.g., CREATE, RESERVE) that have been applied and
- // includes revocable resources as well.
+ // includes revocable resources and resources from resource
+ // providers as well.
Resources totalResources;
SlaveObserver* observer;
http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index a13a641..e680cd5 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -514,18 +514,19 @@ message RegisterSlaveMessage {
// frameworks).
repeated SlaveInfo.Capability agent_capabilities = 4;
- // Used to establish the relationship between the operation and the
+ // Resource version UUID for agent default resources. Used to
+ // establish the relationship between the operation and the
// resources that the operation is operating on. Each resource
- // provider will keep a resource version UUID, and change it when
- // it believes that the resources from this resource provider are
- // out of sync from the master's view. The master will keep track
- // of the last known resource version UUID for each resource
- // provider, and attach the resource version UUID in each operation
- // it sends out. The resource provider should reject operations that
- // have a different resource version UUID than that it maintains,
- // because this means the operation is operating on resources that
- // might have already been invalidated.
- repeated ResourceVersionUUID resource_version_uuids = 5;
+ // provider will keep a resource version UUID, and change it when it
+ // believes that the resources from this resource provider are out
+ // of sync from the master's view. The master will keep track of
+ // the last known resource version UUID for each resource provider,
+ // and attach the resource version UUID in each operation it sends
+ // out. The resource provider should reject operations that have a
+ // different resource version UUID than that it maintains, because
+ // this means the operation is operating on resources that might
+ // have already been invalidated.
+ optional bytes resource_version_uuid = 5;
}
@@ -572,18 +573,19 @@ message ReregisterSlaveMessage {
// frameworks).
repeated SlaveInfo.Capability agent_capabilities = 9;
- // Used to establish the relationship between the operation and the
+ // Resource version UUID for agent default resources. Used to
+ // establish the relationship between the operation and the
// resources that the operation is operating on. Each resource
- // provider will keep a resource version UUID, and change it when
- // it believes that the resources from this resource provider are
- // out of sync from the master's view. The master will keep track
- // of the last known resource version UUID for each resource
- // provider, and attach the resource version UUID in each operation
- // it sends out. The resource provider should reject operations that
- // have a different resource version UUID than that it maintains,
- // because this means the operation is operating on resources that
- // might have already been invalidated.
- repeated ResourceVersionUUID resource_version_uuids = 10;
+ // provider will keep a resource version UUID, and change it when it
+ // believes that the resources from this resource provider are out
+ // of sync from the master's view. The master will keep track of
+ // the last known resource version UUID for each resource provider,
+ // and attach the resource version UUID in each operation it sends
+ // out. The resource provider should reject operations that have a
+ // different resource version UUID than that it maintains, because
+ // this means the operation is operating on resources that might
+ // have already been invalidated.
+ optional bytes resource_version_uuid = 10;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5d4cd6d..302bcd3 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1540,19 +1540,11 @@ void Slave::doReliableRegistration(Duration maxBackoff)
message.mutable_agent_capabilities()->CopyFrom(
capabilities.toRepeatedPtrField());
- ResourceVersionUUID* uuid = message.add_resource_version_uuids();
- uuid->set_uuid(resourceVersion.toBytes());
-
- foreachvalue (ResourceProvider* provider, resourceProviders) {
- ResourceVersionUUID* uuid = message.add_resource_version_uuids();
- CHECK(provider->info.has_id());
- uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
- uuid->set_uuid(provider->resourceVersion.toBytes());
- }
-
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+ message.set_resource_version_uuid(resourceVersion.toBytes());
+
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
@@ -1562,19 +1554,11 @@ void Slave::doReliableRegistration(Duration maxBackoff)
message.mutable_agent_capabilities()->CopyFrom(
capabilities.toRepeatedPtrField());
- ResourceVersionUUID* uuid = message.add_resource_version_uuids();
- uuid->set_uuid(resourceVersion.toBytes());
-
- foreachvalue (ResourceProvider* provider, resourceProviders) {
- ResourceVersionUUID* uuid = message.add_resource_version_uuids();
- CHECK(provider->info.has_id());
- uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
- uuid->set_uuid(provider->resourceVersion.toBytes());
- }
-
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
+ message.set_resource_version_uuid(resourceVersion.toBytes());
+
message.mutable_slave()->CopyFrom(slaveInfo);
foreachvalue (Framework* framework, frameworks) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0fb2a63..5228e03 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8961,7 +8961,9 @@ TEST_F(SlaveTest, ResourceVersions)
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
StandaloneMasterDetector detector(master.get()->pid);
+
slave::Flags slaveFlags = CreateSlaveFlags();
+
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
ASSERT_SOME(slave);
@@ -8972,9 +8974,7 @@ TEST_F(SlaveTest, ResourceVersions)
// Since no resource providers registered, the agent only sends its
// own resource version uuid. The agent has no resource provider id.
- ASSERT_EQ(1u, registerSlaveMessage->resource_version_uuids().size());
- EXPECT_FALSE(registerSlaveMessage->resource_version_uuids(0)
- .has_resource_provider_id());
+ ASSERT_TRUE(registerSlaveMessage->has_resource_version_uuid());
// Check that the agent sends its resource version uuid in
// `ReregisterSlaveMessage`.
@@ -8991,15 +8991,13 @@ TEST_F(SlaveTest, ResourceVersions)
AWAIT_READY(reregisterSlaveMessage);
// No resource changes occurred on the agent and we expect the
- // resource version uuids to be unchanged to the ones sent in the
+ // resource version uuid to be unchanged to the one sent in the
// original registration.
- ASSERT_EQ(
- registerSlaveMessage->resource_version_uuids_size(),
- reregisterSlaveMessage->resource_version_uuids_size());
+ ASSERT_TRUE(reregisterSlaveMessage->has_resource_version_uuid());
EXPECT_EQ(
- registerSlaveMessage->resource_version_uuids(0),
- reregisterSlaveMessage->resource_version_uuids(0));
+ registerSlaveMessage->resource_version_uuid(),
+ reregisterSlaveMessage->resource_version_uuid());
}