This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f05f47  Make RoundRobin packing algo more forgivable when container 
resource is not set. Don't check if container resource is not set (#3273)
9f05f47 is described below

commit 9f05f47227aa87d0014621519e2d9c89f667a065
Author: Ning Wang <[email protected]>
AuthorDate: Thu May 23 15:39:05 2019 -0700

    Make RoundRobin packing algo more forgivable when container resource is not 
set. Don't check if container resource is not set (#3273)
---
 .../packing/roundrobin/RoundRobinPacking.java      |  75 +++++-----
 .../packing/roundrobin/RoundRobinPackingTest.java  | 154 +++++++++++++++++++++
 2 files changed, 198 insertions(+), 31 deletions(-)

diff --git 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 3f87bf7..0b3f4c8 100644
--- 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -149,13 +149,17 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
         getRoundRobinAllocation(numContainer, parallelismMap);
 
     Resource containerResourceHint = 
getContainerResourceHint(roundRobinAllocation);
+    int largestContainerSize = getLargestContainerSize(roundRobinAllocation);
 
     // Get the RAM map for every instance
+    ByteAmount containerRamDefault =
+        
instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding);
     Map<Integer, Map<InstanceId, ByteAmount>> instancesRamMap =
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         TopologyUtils.getComponentRamMapConfig(topology),
         containerResourceHint.getRam(),
+        containerRamDefault,
         instanceRamDefault,
         containerRamPadding,
         ByteAmount.ZERO,
@@ -163,11 +167,14 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
         RAM);
 
     // Get the CPU map for every instance
+    float containerCPUDefault =
+        Math.round(instanceCpuDefault * largestContainerSize + 
containerCpuPadding);
     Map<Integer, Map<InstanceId, CPUShare>> instancesCpuMap =
         calculateInstancesResourceMapInContainer(
         roundRobinAllocation,
         
CPUShare.convertDoubleMapToCpuShareMap(TopologyUtils.getComponentCpuMapConfig(topology)),
         CPUShare.fromDouble(containerResourceHint.getCpu()),
+        CPUShare.fromDouble(containerCPUDefault),
         CPUShare.fromDouble(instanceCpuDefault),
         CPUShare.fromDouble(containerCpuPadding),
         CPUShare.fromDouble(0.0),
@@ -205,22 +212,26 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
       }
 
       // finalize container resource
+      containerCpu += containerCpuPadding;
+      if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
+        containerCpu = Math.min(containerCpu, containerResourceHint.getCpu());
+      }
+
+      containerRam = containerRam.plus(containerRamPadding);
       if (!containerResourceHint.getRam().equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
         containerRam = ByteAmount.fromBytes(
-            Math.min(containerRam.plus(containerRamPadding).asBytes(),
-                containerResourceHint.getRam().asBytes()));
-      } else {
-        containerRam = containerRam.plus(containerRamPadding);
+            Math.min(containerRam.asBytes(), 
containerResourceHint.getRam().asBytes()));
       }
 
-      if (containerResourceHint.getCpu() != NOT_SPECIFIED_CPU_SHARE) {
-        containerCpu = Math.min(containerCpu + containerCpuPadding, 
containerResourceHint.getCpu());
-      } else {
-        containerCpu += containerCpuPadding;
+      ByteAmount containerDisk = containerResourceHint.getDisk();
+      if (containerDisk.equals(NOT_SPECIFIED_BYTE_AMOUNT)) {
+        containerDisk = instanceDiskDefault
+            
.multiply(largestContainerSize).plus(DEFAULT_DISK_PADDING_PER_CONTAINER);
       }
 
-      Resource resource = new Resource(Math.max(containerCpu, 
containerResourceHint.getCpu()),
-          containerRam, containerResourceHint.getDisk());
+      Resource resource = new Resource(
+          Math.max(containerCpu, containerResourceHint.getCpu()),
+          containerRam, containerDisk);
       PackingPlan.ContainerPlan containerPlan = new PackingPlan.ContainerPlan(
           containerId, new HashSet<>(instancePlanMap.values()), resource);
 
@@ -275,6 +286,7 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
                 Map<Integer, List<InstanceId>> allocation,
                 Map<String, T> resMap,
                 T containerResHint,
+                T defaultContainerRes,
                 T instanceResDefault,
                 T containerResPadding,
                 T zero,
@@ -323,28 +335,30 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
       }
 
       // calculate resource for the remaining unspecified instances if any
+      T containerRes = containerResHint;
+      if (containerResHint.equals(notSpecified)) {
+        containerRes = defaultContainerRes;
+      }
+
       if (!unspecifiedInstances.isEmpty()) {
         T individualInstanceRes = instanceResDefault;
 
-        // If container resource is specified
-        if (!containerResHint.equals(notSpecified)) {
-          // discount resource for heron internal process (padding) and used 
(usedRes)
-          T remainingRes;
-          if (paddingThrottling) {
-            remainingRes = (T) containerResHint.minus(usedRes);
-          } else {
-            remainingRes = (T) 
containerResHint.minus(containerResPadding).minus(usedRes);
-          }
-
-          if (remainingRes.lessOrEqual(zero)) {
-            throw new PackingException(String.format("Invalid packing plan 
generated. "
-                + "No enough %s to allocate for unspecified instances", 
resourceType));
-          }
-
-          // Split remaining resource evenly
-          individualInstanceRes = (T) 
remainingRes.divide(unspecifiedInstances.size());
+        // discount resource for heron internal process (padding) and used 
(usedRes)
+        T remainingRes;
+        if (paddingThrottling) {
+          remainingRes = (T) containerRes.minus(usedRes);
+        } else {
+          remainingRes = (T) 
containerRes.minus(containerResPadding).minus(usedRes);
         }
 
+        if (remainingRes.lessOrEqual(zero)) {
+          throw new PackingException(String.format("Invalid packing plan 
generated. "
+              + "No enough %s to allocate for unspecified instances", 
resourceType));
+        }
+
+        // Split remaining resource evenly
+        individualInstanceRes = (T) 
remainingRes.divide(unspecifiedInstances.size());
+
         // Put the results in resInsideContainer
         for (InstanceId instanceId : unspecifiedInstances) {
           resInsideContainer.put(instanceId, individualInstanceRes);
@@ -434,12 +448,11 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
 
     return new Resource(
         TopologyUtils.getConfigWithDefault(topologyConfig, 
TOPOLOGY_CONTAINER_CPU_REQUESTED,
-            (double) Math.round(instanceCpuDefault * largestContainerSize + 
containerCpuPadding)),
+            NOT_SPECIFIED_CPU_SHARE),
         TopologyUtils.getConfigWithDefault(topologyConfig, 
TOPOLOGY_CONTAINER_RAM_REQUESTED,
-            
instanceRamDefault.multiply(largestContainerSize).plus(containerRamPadding)),
+            NOT_SPECIFIED_BYTE_AMOUNT),
         TopologyUtils.getConfigWithDefault(topologyConfig, 
TOPOLOGY_CONTAINER_DISK_REQUESTED,
-            instanceDiskDefault.multiply(largestContainerSize)
-                .plus(DEFAULT_DISK_PADDING_PER_CONTAINER)));
+            NOT_SPECIFIED_BYTE_AMOUNT));
   }
 
   /**
diff --git 
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
 
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 5c3d3ac..a642a60 100644
--- 
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ 
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -390,6 +390,160 @@ public class RoundRobinPackingTest extends 
CommonPackingTests {
             numContainers, getDefaultPadding()).cloneWithCpu(containerCpu));
   }
 
+  @Test
+  public void testFullRamMapWithoutContainerRequestedResources() throws 
Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container 
resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    ByteAmount spoutRam = ByteAmount.fromMegabytes(500);
+    ByteAmount boltRam = ByteAmount.fromMegabytes(1000);
+    Resource containerResource = new Resource(containerCpu, containerRam, 
containerDisk);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(SPOUT_NAME, spoutRam);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.of(spoutRam), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : 
packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : 
containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      // Bolt and spout ram sizes are both fixed.
+      Assert.assertEquals(2, differentResources.size());
+    }
+  }
+
+  @Test
+  public void testNoRamMapWithoutContainerRequestedResources() throws 
Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container 
resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    Resource containerResource = new Resource(containerCpu, containerRam, 
containerDisk);
+
+    // Container RAM is not set in config
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.empty(), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : 
packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : 
containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      Assert.assertEquals(1, differentResources.size());
+      int instancesCount = containerPlan.getInstances().size();
+      Assert.assertEquals(containerRam
+          
.minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+          differentResources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) 
/ instancesCount,
+          differentResources.iterator().next().getCpu(), DELTA);
+    }
+  }
+
+  @Test
+  public void testPartialRamMapWithoutContainerRequestedResources() throws 
Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(6); // max container 
resource is 6G
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    ByteAmount boltRam = ByteAmount.fromGigabytes(1);
+    Resource containerResource = new Resource(containerCpu, containerRam, 
containerDisk);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : 
packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : 
containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      int instancesCount = containerPlan.getInstances().size();
+      if (instancesCount == 4) {
+        // Biggest container
+        Assert.assertEquals(1, differentResources.size());
+      } else {
+        // Smaller container
+        Assert.assertEquals(2, differentResources.size());
+      }
+    }
+  }
+
+  // Throw an error if default container resource (default instance resource * 
number of instances
+  // + padding) is not enough.
+  @Test(expected = PackingException.class)
+  public void testHugePartialRamMapWithoutContainerRequestedResources() throws 
Exception {
+    // Explicit set resources for container
+    ByteAmount containerRam = ByteAmount.fromGigabytes(10);
+    ByteAmount containerDisk = ByteAmount.fromGigabytes(20);
+    double containerCpu = 30;
+    Resource containerResource = new Resource(containerCpu, containerRam, 
containerDisk);
+    ByteAmount boltRam = ByteAmount.fromGigabytes(10);
+
+    // Don't set container RAM
+    topologyConfig.setContainerDiskRequested(containerDisk);
+    topologyConfig.setContainerCpuRequested(containerCpu);
+    topologyConfig.setComponentRam(BOLT_NAME, boltRam);
+    topology = getTopology(spoutParallelism, boltParallelism, topologyConfig);
+
+    PackingPlan packingPlan = doPackingTestWithPartialResource(topology,
+        Optional.of(boltRam), Optional.empty(), boltParallelism,
+        Optional.empty(), Optional.empty(), spoutParallelism,
+        numContainers, getDefaultPadding(), containerResource);
+
+    for (PackingPlan.ContainerPlan containerPlan : 
packingPlan.getContainers()) {
+      // All instances' resource requirement should be equal
+      // So the size of set should be 1
+      Set<Resource> differentResources = new HashSet<>();
+      for (PackingPlan.InstancePlan instancePlan : 
containerPlan.getInstances()) {
+        differentResources.add(instancePlan.getResource());
+      }
+
+      Assert.assertEquals(1, differentResources.size());
+      int instancesCount = containerPlan.getInstances().size();
+      Assert.assertEquals(containerRam
+          
.minus(RoundRobinPacking.DEFAULT_RAM_PADDING_PER_CONTAINER).divide(instancesCount),
+          differentResources.iterator().next().getRam());
+
+      Assert.assertEquals(
+          (containerCpu - RoundRobinPacking.DEFAULT_CPU_PADDING_PER_CONTAINER) 
/ instancesCount,
+          differentResources.iterator().next().getCpu(), DELTA);
+    }
+  }
+
   /**
    * Test the scenario RAM map config is partially set
    */

Reply via email to