nwangtw closed pull request #3152: Fix RoundRobinPacking repack with no 
specified numContainers
URL: https://github.com/apache/incubator-heron/pull/3152
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0a80440de8..1056504134 100644
--- 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -390,10 +390,18 @@ private void validatePackingPlan(PackingPlan plan) throws 
PackingException {
     }
   }
 
-  /*
-   * read the current packing plan with update parallelism to calculate a new 
packing plan
-   * the packing algorithm packInternal() is shared with pack()
+  /**
+   * Read the current packing plan with update parallelism to calculate a new 
packing plan.
+   * This method should determine a new number of containers based on the 
updated parallism
+   * while remaining the number of instances per container <= that of the old 
packing plan.
+   * The packing algorithm packInternal() is shared with pack()
    * delegate to packInternal() with the new container count and component 
parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param componentChanges Map &lt; componentName, new component parallelism 
&gt;
+   * that contains the parallelism for each component whose parallelism has 
changed.
+   * @return new packing plan
+   * @throws PackingException
    */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, Map<String, 
Integer> componentChanges)
@@ -402,11 +410,10 @@ public PackingPlan repack(PackingPlan currentPackingPlan, 
Map<String, Integer> c
     int initialNumInstance = TopologyUtils.getTotalInstance(topology);
     double initialNumInstancePerContainer = (double) initialNumInstance / 
initialNumContainer;
 
-    Map<String, Integer> currentComponentParallelism = 
currentPackingPlan.getComponentCounts();
     Map<String, Integer> newComponentParallelism =
         getNewComponentParallelism(currentPackingPlan, componentChanges);
 
-    int newNumInstance = 
TopologyUtils.getTotalInstance(currentComponentParallelism);
+    int newNumInstance = 
TopologyUtils.getTotalInstance(newComponentParallelism);
     int newNumContainer = (int) Math.ceil(newNumInstance / 
initialNumInstancePerContainer);
     return packInternal(newNumContainer, newComponentParallelism);
   }
@@ -421,6 +428,20 @@ public PackingPlan repack(PackingPlan currentPackingPlan, 
Map<String, Integer> c
     return currentComponentParallelism;
   }
 
+  /**
+   * Read the current packing plan with update parallelism and number of 
containers
+   * to calculate a new packing plan.
+   * The packing algorithm packInternal() is shared with pack()
+   * delegate to packInternal() with the new container count and component 
parallelism
+   *
+   * @param currentPackingPlan Existing packing plan
+   * @param containers &lt; the new number of containers for the topology
+   * specified by the user
+   * @param componentChanges Map &lt; componentName, new component parallelism 
&gt;
+   * that contains the parallelism for each component whose parallelism has 
changed.
+   * @return new packing plan
+   * @throws PackingException
+   */
   @Override
   public PackingPlan repack(PackingPlan currentPackingPlan, int containers, 
Map<String, Integer>
       componentChanges) throws PackingException {
diff --git 
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
 
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 742c6a4987..62f030ed0d 100644
--- 
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++ 
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -346,10 +346,10 @@ public void testEvenPacking() throws Exception {
 
 
   /**
-   * test re-packing of instances
+   * test re-packing with same total instances
    */
   @Test
-  public void testRePacking() throws Exception {
+  public void testRepackingWithSameTotalInstances() throws Exception {
     int numContainers = 2;
     int componentParallelism = 4;
 
@@ -388,6 +388,94 @@ public void testRePacking() throws Exception {
     Assert.assertEquals(componentParallelism + 1, boltCount);
   }
 
+  /**
+   * test re-packing with more total instances
+   */
+  @Test
+  public void testRepackingWithMoreTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new 
org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, 
numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, 
topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, +1);
+    componentChanges.put(BOLT_NAME,  +1);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, 
componentChanges);
+    Assert.assertEquals(numContainers + 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance + 2), 
output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism + 1, spoutCount);
+    Assert.assertEquals(componentParallelism + 1, boltCount);
+  }
+
+  /**
+   * test re-packing with fewer total instances
+   */
+  @Test
+  public void testRepackingWithFewerTotalInstances() throws Exception {
+    int numContainers = 2;
+    int componentParallelism = 4;
+
+    // Set up the topology and its config
+    org.apache.heron.api.Config topologyConfig = new 
org.apache.heron.api.Config();
+    topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS, 
numContainers);
+
+    TopologyAPI.Topology topology =
+        getTopology(componentParallelism, componentParallelism, 
topologyConfig);
+
+    int numInstance = TopologyUtils.getTotalInstance(topology);
+    // Two components
+    Assert.assertEquals(2 * componentParallelism, numInstance);
+
+    Map<String, Integer> componentChanges = new HashMap<>();
+    componentChanges.put(SPOUT_NAME, -2);
+    componentChanges.put(BOLT_NAME,  -2);
+    PackingPlan output = getRoundRobinRePackingPlan(topology, 
componentChanges);
+    Assert.assertEquals(numContainers - 1, output.getContainers().size());
+    Assert.assertEquals((Integer) (numInstance - 4), 
output.getInstanceCount());
+
+    int spoutCount = 0;
+    int boltCount = 0;
+    for (PackingPlan.ContainerPlan container : output.getContainers()) {
+      Assert.assertTrue((double) container.getInstances().size()
+          <= (double) numInstance / numContainers);
+
+      for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+        if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+          spoutCount++;
+        } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+          boltCount++;
+        }
+      }
+    }
+    Assert.assertEquals(componentParallelism - 2, spoutCount);
+    Assert.assertEquals(componentParallelism - 2, boltCount);
+  }
+
   private static void assertComponentCount(
       PackingPlan.ContainerPlan containerPlan, String componentName, int 
expectedCount) {
     int count = 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to