nlu90 commented on a change in pull request #3142: Validate resource constraint 
(RAM and CPU) in RoundRobinPacking
URL: https://github.com/apache/incubator-heron/pull/3142#discussion_r245093753
 
 

 ##########
 File path: 
heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
 ##########
 @@ -211,69 +248,75 @@ private ByteAmount 
getContainerRamPadding(List<TopologyAPI.Config.KeyValue> topo
         daemonProcessPadding);
   }
 
-  /**
-   * Calculate the RAM required by any instance in the container
-   *
-   * @param allocation the allocation of instances in different container
-   * @return A map: (containerId -&gt; (instanceId -&gt; instanceRequiredRam))
-   */
-  private Map<Integer, Map<InstanceId, ByteAmount>> 
getInstancesRamMapInContainer(
-      Map<Integer, List<InstanceId>> allocation) {
-    Map<String, ByteAmount> ramMap = 
TopologyUtils.getComponentRamMapConfig(topology);
-
-    Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMapInContainer = new 
HashMap<>();
-    ByteAmount containerRamHint = getContainerRamHint(allocation);
+  @SuppressWarnings("unchecked")
+  private <T extends ResourceMeasure> Map<Integer, Map<InstanceId, T>>
+            calculateInstancesResourceMapInContainer(
+                Map<Integer, List<InstanceId>> allocation,
+                Map<String, T> resMap,
+                T containerResHint,
+                T instanceResDefault,
+                T containerResPadding,
+                T zero,
+                T notSpecified,
+                String resourceType) {
+    Map<Integer, Map<InstanceId, T>> instancesResMapInContainer = new 
HashMap<>();
 
     for (int containerId : allocation.keySet()) {
       List<InstanceId> instanceIds = allocation.get(containerId);
-      Map<InstanceId, ByteAmount> ramInsideContainer = new HashMap<>();
-      instancesRamMapInContainer.put(containerId, ramInsideContainer);
-      List<InstanceId> instancesToBeAccounted = new ArrayList<>();
+      Map<InstanceId, T> resInsideContainer = new HashMap<>();
+      instancesResMapInContainer.put(containerId, resInsideContainer);
+      List<InstanceId> unspecifiedInstances = new ArrayList<>();
 
-      // Calculate the actual value
-      ByteAmount usedRam = ByteAmount.ZERO;
+      // Register the instance resource allocation and calculate the used 
resource so far
+      T usedRes = zero;
       for (InstanceId instanceId : instanceIds) {
         String componentName = instanceId.getComponentName();
-        if (ramMap.containsKey(componentName)) {
-          ByteAmount ram = ramMap.get(componentName);
-          ramInsideContainer.put(instanceId, ram);
-          usedRam = usedRam.plus(ram);
+        if (resMap.containsKey(componentName)) {
+          T res = resMap.get(componentName);
+          resInsideContainer.put(instanceId, res);
+          usedRes = (T) usedRes.plus(res);
         } else {
-          instancesToBeAccounted.add(instanceId);
+          unspecifiedInstances.add(instanceId);
         }
       }
 
-      // Now we have calculated RAM for instances specified in ComponentRamMap
-      // Then to calculate RAM for the rest instances
-      int instancesToAllocate = instancesToBeAccounted.size();
+      // Validate instance resources specified so far don't violate 
container-level constraint
+      if (!containerResHint.equals(notSpecified)
+          && usedRes.greaterThan(containerResHint.minus(containerResPadding))) 
{
+        throw new PackingException(String.format("Invalid packing plan 
generated. "
+                + "Total instance %s in a container (%s) have exceeded "
+                + "the container-level constraint of %s.",
+            resourceType, usedRes.toString(), containerResHint));
+      }
 
-      if (instancesToAllocate != 0) {
-        ByteAmount individualInstanceRam = instanceRamDefault;
+      // calculate resource for the remaining unspecified instances if any
+      if (!unspecifiedInstances.isEmpty()) {
+        T individualInstanceRes = instanceResDefault;
 
-        // The RAM map is partially set. We need to calculate RAM for the rest
+        // If container resource is specified
+        if (!containerResHint.equals(notSpecified)) {
+          // discount resource for heron internal process (padding) and used 
(usedRes)
+          T remainingRes = (T) ((T) 
containerResHint.minus(containerResPadding)).minus(usedRes);
 
-        // We have different strategy depending on whether container RAM is 
specified
-        // If container RAM is specified
-        if (!containerRamHint.equals(NOT_SPECIFIED_NUMBER_VALUE)) {
-          // remove RAM for heron internal process
-          ByteAmount remainingRam =
-              containerRamHint.minus(containerRamPadding).minus(usedRam);
+          if (remainingRes.lessOrEqual(zero)) {
+            throw new PackingException(String.format("Invalid packing plan 
generated. "
+                + "No enough %s to allocate for unspecified instances", 
resourceType));
+          }
 
           // Split remaining RAM evenly
 
 Review comment:
   update comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to