nwangtw closed pull request #3152: Fix RoundRobinPacking repack with no
specified numContainers
URL: https://github.com/apache/incubator-heron/pull/3152
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0a80440de8..1056504134 100644
---
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -390,10 +390,18 @@ private void validatePackingPlan(PackingPlan plan) throws
PackingException {
}
}
- /*
- * read the current packing plan with update parallelism to calculate a new
packing plan
- * the packing algorithm packInternal() is shared with pack()
+ /**
+ * Read the current packing plan with update parallelism to calculate a new
packing plan.
+ * This method should determine a new number of containers based on the
updated parallism
+ * while remaining the number of instances per container <= that of the old
packing plan.
+ * The packing algorithm packInternal() is shared with pack()
* delegate to packInternal() with the new container count and component
parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param componentChanges Map < componentName, new component parallelism
>
+ * that contains the parallelism for each component whose parallelism has
changed.
+ * @return new packing plan
+ * @throws PackingException
*/
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, Map<String,
Integer> componentChanges)
@@ -402,11 +410,10 @@ public PackingPlan repack(PackingPlan currentPackingPlan,
Map<String, Integer> c
int initialNumInstance = TopologyUtils.getTotalInstance(topology);
double initialNumInstancePerContainer = (double) initialNumInstance /
initialNumContainer;
- Map<String, Integer> currentComponentParallelism =
currentPackingPlan.getComponentCounts();
Map<String, Integer> newComponentParallelism =
getNewComponentParallelism(currentPackingPlan, componentChanges);
- int newNumInstance =
TopologyUtils.getTotalInstance(currentComponentParallelism);
+ int newNumInstance =
TopologyUtils.getTotalInstance(newComponentParallelism);
int newNumContainer = (int) Math.ceil(newNumInstance /
initialNumInstancePerContainer);
return packInternal(newNumContainer, newComponentParallelism);
}
@@ -421,6 +428,20 @@ public PackingPlan repack(PackingPlan currentPackingPlan,
Map<String, Integer> c
return currentComponentParallelism;
}
+ /**
+ * Read the current packing plan with update parallelism and number of
containers
+ * to calculate a new packing plan.
+ * The packing algorithm packInternal() is shared with pack()
+ * delegate to packInternal() with the new container count and component
parallelism
+ *
+ * @param currentPackingPlan Existing packing plan
+ * @param containers < the new number of containers for the topology
+ * specified by the user
+ * @param componentChanges Map < componentName, new component parallelism
>
+ * that contains the parallelism for each component whose parallelism has
changed.
+ * @return new packing plan
+ * @throws PackingException
+ */
@Override
public PackingPlan repack(PackingPlan currentPackingPlan, int containers,
Map<String, Integer>
componentChanges) throws PackingException {
diff --git
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
index 742c6a4987..62f030ed0d 100644
---
a/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
+++
b/heron/packing/tests/java/org/apache/heron/packing/roundrobin/RoundRobinPackingTest.java
@@ -346,10 +346,10 @@ public void testEvenPacking() throws Exception {
/**
- * test re-packing of instances
+ * test re-packing with same total instances
*/
@Test
- public void testRePacking() throws Exception {
+ public void testRepackingWithSameTotalInstances() throws Exception {
int numContainers = 2;
int componentParallelism = 4;
@@ -388,6 +388,94 @@ public void testRePacking() throws Exception {
Assert.assertEquals(componentParallelism + 1, boltCount);
}
+ /**
+ * test re-packing with more total instances
+ */
+ @Test
+ public void testRepackingWithMoreTotalInstances() throws Exception {
+ int numContainers = 2;
+ int componentParallelism = 4;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new
org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS,
numContainers);
+
+ TopologyAPI.Topology topology =
+ getTopology(componentParallelism, componentParallelism,
topologyConfig);
+
+ int numInstance = TopologyUtils.getTotalInstance(topology);
+ // Two components
+ Assert.assertEquals(2 * componentParallelism, numInstance);
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, +1);
+ componentChanges.put(BOLT_NAME, +1);
+ PackingPlan output = getRoundRobinRePackingPlan(topology,
componentChanges);
+ Assert.assertEquals(numContainers + 1, output.getContainers().size());
+ Assert.assertEquals((Integer) (numInstance + 2),
output.getInstanceCount());
+
+ int spoutCount = 0;
+ int boltCount = 0;
+ for (PackingPlan.ContainerPlan container : output.getContainers()) {
+ Assert.assertTrue((double) container.getInstances().size()
+ <= (double) numInstance / numContainers);
+
+ for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+ if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+ spoutCount++;
+ } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+ boltCount++;
+ }
+ }
+ }
+ Assert.assertEquals(componentParallelism + 1, spoutCount);
+ Assert.assertEquals(componentParallelism + 1, boltCount);
+ }
+
+ /**
+ * test re-packing with fewer total instances
+ */
+ @Test
+ public void testRepackingWithFewerTotalInstances() throws Exception {
+ int numContainers = 2;
+ int componentParallelism = 4;
+
+ // Set up the topology and its config
+ org.apache.heron.api.Config topologyConfig = new
org.apache.heron.api.Config();
+ topologyConfig.put(org.apache.heron.api.Config.TOPOLOGY_STMGRS,
numContainers);
+
+ TopologyAPI.Topology topology =
+ getTopology(componentParallelism, componentParallelism,
topologyConfig);
+
+ int numInstance = TopologyUtils.getTotalInstance(topology);
+ // Two components
+ Assert.assertEquals(2 * componentParallelism, numInstance);
+
+ Map<String, Integer> componentChanges = new HashMap<>();
+ componentChanges.put(SPOUT_NAME, -2);
+ componentChanges.put(BOLT_NAME, -2);
+ PackingPlan output = getRoundRobinRePackingPlan(topology,
componentChanges);
+ Assert.assertEquals(numContainers - 1, output.getContainers().size());
+ Assert.assertEquals((Integer) (numInstance - 4),
output.getInstanceCount());
+
+ int spoutCount = 0;
+ int boltCount = 0;
+ for (PackingPlan.ContainerPlan container : output.getContainers()) {
+ Assert.assertTrue((double) container.getInstances().size()
+ <= (double) numInstance / numContainers);
+
+ for (PackingPlan.InstancePlan instancePlan : container.getInstances()) {
+ if (SPOUT_NAME.equals(instancePlan.getComponentName())) {
+ spoutCount++;
+ } else if (BOLT_NAME.equals(instancePlan.getComponentName())) {
+ boltCount++;
+ }
+ }
+ }
+ Assert.assertEquals(componentParallelism - 2, spoutCount);
+ Assert.assertEquals(componentParallelism - 2, boltCount);
+ }
+
private static void assertComponentCount(
PackingPlan.ContainerPlan containerPlan, String componentName, int
expectedCount) {
int count = 0;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services