nlu90 commented on a change in pull request #3142: Validate resource constraint
(RAM and CPU) in RoundRobinPacking
URL: https://github.com/apache/incubator-heron/pull/3142#discussion_r245093794
##########
File path:
heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
##########
@@ -211,69 +248,75 @@ private ByteAmount
getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topo
daemonProcessPadding);
}
- /**
- * Calculate the RAM required by any instance in the container
- *
- * @param allocation the allocation of instances in different container
- * @return A map: (containerId -> (instanceId -> instanceRequiredRam))
- */
- private Map<Integer, Map<InstanceId, ByteAmount>>
getInstancesRamMapInContainer(
- Map<Integer, List<InstanceId>> allocation) {
- Map<String, ByteAmount> ramMap =
TopologyUtils.getComponentRamMapConfig(topology);
-
- Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMapInContainer = new
HashMap<>();
- ByteAmount containerRamHint = getContainerRamHint(allocation);
+ @SuppressWarnings("unchecked")
+ private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+ calculateInstancesResourceMapInContainer(
+ Map<Integer, List<InstanceId>> allocation,
+ Map<String, T> resMap,
+ T containerResHint,
+ T instanceResDefault,
+ T containerResPadding,
+ T zero,
+ T notSpecified,
+ String resourceType) {
+ Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new
HashMap<>();
for (int containerId : allocation.keySet()) {
List<InstanceId> instanceIds = allocation.get(containerId);
- Map<InstanceId, ByteAmount> ramInsideContainer = new HashMap<>();
- instancesRamMapInContainer.put(containerId, ramInsideContainer);
- List<InstanceId> instancesToBeAccounted = new ArrayList<>();
+ Map<InstanceId, T> resInsideContainer = new HashMap<>();
+ instancesResMapInContainer.put(containerId, resInsideContainer);
+ List<InstanceId> unspecifiedInstances = new ArrayList<>();
- // Calculate the actual value
- ByteAmount usedRam = ByteAmount.ZERO;
+ // Register the instance resource allocation and calculate the used
resource so far
+ T usedRes = zero;
for (InstanceId instanceId : instanceIds) {
String componentName = instanceId.getComponentName();
- if (ramMap.containsKey(componentName)) {
- ByteAmount ram = ramMap.get(componentName);
- ramInsideContainer.put(instanceId, ram);
- usedRam = usedRam.plus(ram);
+ if (resMap.containsKey(componentName)) {
+ T res = resMap.get(componentName);
+ resInsideContainer.put(instanceId, res);
+ usedRes = (T) usedRes.plus(res);
} else {
- instancesToBeAccounted.add(instanceId);
+ unspecifiedInstances.add(instanceId);
}
}
- // Now we have calculated RAM for instances specified in ComponentRamMap
- // Then to calculate RAM for the rest instances
- int instancesToAllocate = instancesToBeAccounted.size();
+ // Validate instance resources specified so far don't violate
container-level constraint
+ if (!containerResHint.equals(notSpecified)
+ && usedRes.greaterThan(containerResHint.minus(containerResPadding)))
{
+ throw new PackingException(String.format("Invalid packing plan
generated. "
+ + "Total instance %s in a container (%s) have exceeded "
+ + "the container-level constraint of %s.",
+ resourceType, usedRes.toString(), containerResHint));
+ }
- if (instancesToAllocate != 0) {
- ByteAmount individualInstanceRam = instanceRamDefault;
+ // calculate resource for the remaining unspecified instances if any
+ if (!unspecifiedInstances.isEmpty()) {
+ T individualInstanceRes = instanceResDefault;
- // The RAM map is partially set. We need to calculate RAM for the rest
+ // If container resource is specified
+ if (!containerResHint.equals(notSpecified)) {
+ // discount resource for heron internal process (padding) and used
(usedRes)
+ T remainingRes = (T) ((T)
containerResHint.minus(containerResPadding)).minus(usedRes);
- // We have different strategy depending on whether container RAM is
specified
- // If container RAM is specified
- if (!containerRamHint.equals(NOT_SPECIFIED_NUMBER_VALUE)) {
- // remove RAM for heron internal process
- ByteAmount remainingRam =
- containerRamHint.minus(containerRamPadding).minus(usedRam);
+ if (remainingRes.lessOrEqual(zero)) {
+ throw new PackingException(String.format("Invalid packing plan
generated. "
+ + "No enough %s to allocate for unspecified instances",
resourceType));
+ }
// Split remaining RAM evenly
- individualInstanceRam = remainingRam.divide(instancesToAllocate);
+ individualInstanceRes = (T)
remainingRes.divide(unspecifiedInstances.size());
}
// Put the results in instancesRam
Review comment:
update comment
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services