[ 
https://issues.apache.org/jira/browse/MESOS-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Park updated MESOS-2373:
--------------------------------
    Description: 
Currently the {{DRFSorter}} aggregates total and allocated resources across 
multiple slaves, which only works for scalar resources. We need to distinguish 
resources from different slaves.

Suppose we have 2 slaves and 1 framework. The framework is allocated all 
resources from both slaves.

{code}
Resources slaveResources =
  Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();

DRFSorter sorter;

sorter.add(slaveResources);  // Add slave1 resources
sorter.add(slaveResources);  // Add slave2 resources

// Total resources in sorter at this point is
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
// The scalar resources get aggregated correctly but ports do not.

sorter.add("F");

// The 2 calls to allocated only works because we simply do:
//   allocation[name] += resources;
// without checking that the 'resources' is available in the total.

sorter.allocated("F", slaveResources);
sorter.allocated("F", slaveResources);

// At this point, sorter.allocation("F") is:
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
{code}

To provide some context, this issue came up while trying to reserve all 
unreserved resources from every offer.

{code}
for (const Offer& offer : offers) { 
  Resources unreserved = offer.resources().unreserved();
  Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK); 

  Offer::Operation reserve;
  reserve.set_type(Offer::Operation::RESERVE); 
  reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved); 
 
  driver->acceptOffers({offer.id()}, {reserve}); 
} 
{code}

Suppose the slave resources are the same as above:

{quote}
Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}

Initial (incorrect) total resources in the DRFSorter is:

{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}

We receive 2 offers, 1 from each slave:

{quote}
Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}

At this point, the resources allocated for the framework is:

{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}

After first {{RESERVE}} operation with Offer1:

The allocated resources for the framework becomes:

{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512; 
ports(role):\[31000-32000\]}}
{quote}

During second {{RESERVE}} operation with Offer2:

{code:title=HierarchicalAllocatorProcess::updateAllocation}
  // ...

  FrameworkSorter* frameworkSorter =
    frameworkSorters[frameworks\[frameworkId\].role];

  Resources allocation = frameworkSorter->allocation(frameworkId.value());

  // Update the allocated resources.
  Try<Resources> updatedAllocation = allocation.apply(operations);
  CHECK_SOME(updatedAllocation);

  // ...
{code}

{{allocation}} in the above code is:

{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512; 
ports(role):\[31000-32000\]}}
{quote}

We try to {{apply}} a {{RESERVE}} operation and we fail to find 
{{ports(\*):\[31000-32000\]}} which leads to the {{CHECK}} fail at 
{{CHECK_SOME(updatedAllocation);}}

  was:
Currently the {{DRFSorter}} aggregates total and allocated resources across 
multiple slaves, which only works for scalar resources. We need to distinguish 
resources from different slaves.

Suppose we have 2 slaves and 1 framework. The framework is allocated all 
resources from both slaves.

{code}
Resources slaveResources =
  Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();

DRFSorter sorter;

sorter.add(slaveResources);  // Add slave1 resources
sorter.add(slaveResources);  // Add slave2 resources

// Total resources in sorter at this point is
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
// The scalar resources get aggregated correctly but ports do not.

sorter.add("F");

// The 2 calls to allocated only works because we simply do:
//   allocation[name] += resources;
// without checking that the 'resources' is available in the total.

sorter.allocated("F", slaveResources);
sorter.allocated("F", slaveResources);

// At this point, sorter.allocation("F") is:
// cpus(*):4; mem(*):1024; ports(*):[31000-32000].
{code}

To provide some context, this issue came up while trying to reserve all 
unreserved resources from every offer.

{code}
for (const Offer& offer : offers) { 
  Resources unreserved = offer.resources().unreserved();
  Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK); 

  Offer::Operation reserve;
  reserve.set_type(Offer::Operation::RESERVE); 
  reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved); 
 
  driver->acceptOffers({offer.id()}, {reserve}); 
} 
{code}

Suppose the slave resources are the same as above:

{quote}
Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}

Initial (incorrect) total resources in the DRFSorter is:

{quote}
{{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
{quote}

We receive 2 offers, 1 from each slave:

{quote}
Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
{quote}

After first {{RESERVE}} operation with Offer1:

The total resources in DRFSorter becomes:

{quote}
{{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512; 
ports(role):\[31000-32000\]}}
{quote}

During second {{RESERVE}} operation with Offer2:

We fail to find {{ports:\[31000-32000\]}}, and we {{CHECK}} fail.


> DRFSorter needs to distinguish resources from different slaves.
> ---------------------------------------------------------------
>
>                 Key: MESOS-2373
>                 URL: https://issues.apache.org/jira/browse/MESOS-2373
>             Project: Mesos
>          Issue Type: Bug
>          Components: allocation
>            Reporter: Michael Park
>              Labels: mesosphere
>
> Currently the {{DRFSorter}} aggregates total and allocated resources across 
> multiple slaves, which only works for scalar resources. We need to 
> distinguish resources from different slaves.
> Suppose we have 2 slaves and 1 framework. The framework is allocated all 
> resources from both slaves.
> {code}
> Resources slaveResources =
>   Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
> DRFSorter sorter;
> sorter.add(slaveResources);  // Add slave1 resources
> sorter.add(slaveResources);  // Add slave2 resources
> // Total resources in sorter at this point is
> // cpus(*):4; mem(*):1024; ports(*):[31000-32000].
> // The scalar resources get aggregated correctly but ports do not.
> sorter.add("F");
> // The 2 calls to allocated only works because we simply do:
> //   allocation[name] += resources;
> // without checking that the 'resources' is available in the total.
> sorter.allocated("F", slaveResources);
> sorter.allocated("F", slaveResources);
> // At this point, sorter.allocation("F") is:
> // cpus(*):4; mem(*):1024; ports(*):[31000-32000].
> {code}
> To provide some context, this issue came up while trying to reserve all 
> unreserved resources from every offer.
> {code}
> for (const Offer& offer : offers) { 
>   Resources unreserved = offer.resources().unreserved();
>   Resources reserved = unreserved.flatten(role, Resource::FRAMEWORK); 
>   Offer::Operation reserve;
>   reserve.set_type(Offer::Operation::RESERVE); 
>   reserve.mutable_reserve()->mutable_resources()->CopyFrom(reserved); 
>  
>   driver->acceptOffers({offer.id()}, {reserve}); 
> } 
> {code}
> Suppose the slave resources are the same as above:
> {quote}
> Slave1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> Slave2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> {quote}
> Initial (incorrect) total resources in the DRFSorter is:
> {quote}
> {{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
> {quote}
> We receive 2 offers, 1 from each slave:
> {quote}
> Offer1: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> Offer2: {{cpus(\*):2; mem(\*):512; ports(\*):\[31000-32000\]}}
> {quote}
> At this point, the resources allocated for the framework is:
> {quote}
> {{cpus(\*):4; mem(\*):1024; ports(\*):\[31000-32000\]}}
> {quote}
> After first {{RESERVE}} operation with Offer1:
> The allocated resources for the framework becomes:
> {quote}
> {{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512; 
> ports(role):\[31000-32000\]}}
> {quote}
> During second {{RESERVE}} operation with Offer2:
> {code:title=HierarchicalAllocatorProcess::updateAllocation}
>   // ...
>   FrameworkSorter* frameworkSorter =
>     frameworkSorters[frameworks\[frameworkId\].role];
>   Resources allocation = frameworkSorter->allocation(frameworkId.value());
>   // Update the allocated resources.
>   Try<Resources> updatedAllocation = allocation.apply(operations);
>   CHECK_SOME(updatedAllocation);
>   // ...
> {code}
> {{allocation}} in the above code is:
> {quote}
> {{cpus(\*):2; mem(\*):512; cpus(role):2; mem(role):512; 
> ports(role):\[31000-32000\]}}
> {quote}
> We try to {{apply}} a {{RESERVE}} operation and we fail to find 
> {{ports(\*):\[31000-32000\]}} which leads to the {{CHECK}} fail at 
> {{CHECK_SOME(updatedAllocation);}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to