[
https://issues.apache.org/jira/browse/MESOS-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Park updated MESOS-2373:
--------------------------------
Description:
Currently the {{DRFSorter}} aggregates total and allocated resources across
multiple slaves, which only works for scalar resources. We need to distinguish
resources from different slaves.
Suppose we have 2 slaves and 1 framework. The framework is allocated all
resources from both slaves.
{code}
Resources slaveResources =
Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
DRFSorter sorter;
sorter.add(slaveResources); // Add slave1 resources
sorter.add(slaveResources); // Add slave2 resources
// Total resources in sorter at this point is
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
// The scalar resources get aggregated correctly but ports do not.
sorter.add("F");
// The 2 calls to allocated only works because we simply do:
// allocation[name] += resources;
// without checking that the 'resources' is available in the total.
sorter.allocated("F", slaveResources);
sorter.allocated("F", slaveResources);
// At this point, sorter.allocation("F") is:
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
{code}
To provide some context, this issue came up while trying to reserve all
unreserved resources from every offer.
{code}
for (const Offer& offer : offers) {
Resources unreserved = offer.resources().unreserved();
Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK);
Offer::Operation reserve;
reserve.set_type(Offer::Operation::RESERVE);
reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved);
driver->acceptOffers({offer.id()}, {reserve});
}
{code}
Suppose the slave resources are the same as above:
{quote}
Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}
Initial (incorrect) total resources in the DRFSorter is:
{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}
We receive 2 offers, 1 from each slave:
{quote}
Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}
At this point, the resources allocated for the framework is:
{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}
After first {{RESERVE}} operation with Offer1:
The allocated resources for the framework becomes:
{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512;
ports(role):\[31000-32000\]}}
{quote}
During second {{RESERVE}} operation with Offer2:
{code:title=HierarchicalAllocatorProcess::updateAllocation}
// ...
FrameworkSorter* frameworkSorter =
frameworkSorters[frameworks\[frameworkId\].role];
Resources allocation = frameworkSorter->allocation(frameworkId.value());
// Update the allocated resources.
Try<Resources> updatedAllocation = allocation.apply(operations);
CHECK_SOME(updatedAllocation);
// ...
{code}
{{allocation}} in the above code is:
{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512;
ports(role):\[31000-32000\]}}
{quote}
We try to {{apply}} a {{RESERVE}} operation and we fail to find
{{ports(\*):\[31000-32000\]}} which leads to the {{CHECK}} fail at
{{CHECK_SOME(updatedAllocation);}}
was:
Currently the {{DRFSorter}} aggregates total and allocated resources across
multiple slaves, which only works for scalar resources. We need to distinguish
resources from different slaves.
Suppose we have 2 slaves and 1 framework. The framework is allocated all
resources from both slaves.
{code}
Resources slaveResources =
Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
DRFSorter sorter;
sorter.add(slaveResources); // Add slave1 resources
sorter.add(slaveResources); // Add slave2 resources
// Total resources in sorter at this point is
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
// The scalar resources get aggregated correctly but ports do not.
sorter.add("F");
// The 2 calls to allocated only works because we simply do:
// allocation[name] += resources;
// without checking that the 'resources' is available in the total.
sorter.allocated("F", slaveResources);
sorter.allocated("F", slaveResources);
// At this point, sorter.allocation("F") is:
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
{code}
To provide some context, this issue came up while trying to reserve all
unreserved resources from every offer.
{code}
for (const Offer& offer : offers) {
Resources unreserved = offer.resources().unreserved();
Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK);
Offer::Operation reserve;
reserve.set_type(Offer::Operation::RESERVE);
reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved);
driver->acceptOffers({offer.id()}, {reserve});
}
{code}
Suppose the slave resources are the same as above:
{quote}
Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}
Initial (incorrect) total resources in the DRFSorter is:
{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}
We receive 2 offers, 1 from each slave:
{quote}
Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}
After first {{RESERVE}} operation with Offer1:
The total resources in DRFSorter becomes:
{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512;
ports(role):\[31000-32000\]}}
{quote}
During second {{RESERVE}} operation with Offer2:
We fail to find {{ports:\[31000-32000\]}}, and we {{CHECK}} fail.
> DRFSorter needs to distinguish resources from different slaves.
> ---------------------------------------------------------------
>
> Key: MESOS-2373
> URL: https://issues.apache.org/jira/browse/MESOS-2373
> Project: Mesos
> Issue Type: Bug
> Components: allocation
> Reporter: Michael Park
> Labels: mesosphere
>
> Currently the {{DRFSorter}} aggregates total and allocated resources across
> multiple slaves, which only works for scalar resources. We need to
> distinguish resources from different slaves.
> Suppose we have 2 slaves and 1 framework. The framework is allocated all
> resources from both slaves.
> {code}
> Resources slaveResources =
> Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
> DRFSorter sorter;
> sorter.add(slaveResources); // Add slave1 resources
> sorter.add(slaveResources); // Add slave2 resources
> // Total resources in sorter at this point is
> // cpus(*):4; mem(*):1024; ports(*):[31000-32000].
> // The scalar resources get aggregated correctly but ports do not.
> sorter.add("F");
> // The 2 calls to allocated only works because we simply do:
> // allocation[name] += resources;
> // without checking that the 'resources' is available in the total.
> sorter.allocated("F", slaveResources);
> sorter.allocated("F", slaveResources);
> // At this point, sorter.allocation("F") is:
> // cpus(*):4; mem(*):1024; ports(*):[31000-32000].
> {code}
> To provide some context, this issue came up while trying to reserve all
> unreserved resources from every offer.
> {code}
> for (const Offer& offer : offers) {
> Resources unreserved = offer.resources().unreserved();
> Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK);
> Offer::Operation reserve;
> reserve.set_type(Offer::Operation::RESERVE);
> reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved);
>
> driver->acceptOffers({offer.id()}, {reserve});
> }
> {code}
> Suppose the slave resources are the same as above:
> {quote}
> Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> {quote}
> Initial (incorrect) total resources in the DRFSorter is:
> {quote}
> {{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
> {quote}
> We receive 2 offers, 1 from each slave:
> {quote}
> Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> {quote}
> At this point, the resources allocated for the framework is:
> {quote}
> {{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
> {quote}
> After first {{RESERVE}} operation with Offer1:
> The allocated resources for the framework becomes:
> {quote}
> {{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512;
> ports(role):\[31000-32000\]}}
> {quote}
> During second {{RESERVE}} operation with Offer2:
> {code:title=HierarchicalAllocatorProcess::updateAllocation}
> // ...
> FrameworkSorter* frameworkSorter =
> frameworkSorters[frameworks\[frameworkId\].role];
> Resources allocation = frameworkSorter->allocation(frameworkId.value());
> // Update the allocated resources.
> Try<Resources> updatedAllocation = allocation.apply(operations);
> CHECK_SOME(updatedAllocation);
> // ...
> {code}
> {{allocation}} in the above code is:
> {quote}
> {{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512;
> ports(role):\[31000-32000\]}}
> {quote}
> We try to {{apply}} a {{RESERVE}} operation and we fail to find
> {{ports(\*):\[31000-32000\]}} which leads to the {{CHECK}} fail at
> {{CHECK_SOME(updatedAllocation);}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)