Repository: apex-core Updated Branches: refs/heads/master 86142d0f4 -> b102f59aa
APEXCORE-474 In M*1 case, deploy the unifier in the same container as downstream operator's container Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/b102f59a Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/b102f59a Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/b102f59a Branch: refs/heads/master Commit: b102f59aa03cad50b4655618c1e9bcb6bb2f41c9 Parents: 86142d0 Author: Sandesh Hegde <[email protected]> Authored: Wed Sep 28 15:51:37 2016 -0700 Committer: Sandesh Hegde <[email protected]> Committed: Fri Sep 30 16:49:58 2016 -0700 ---------------------------------------------------------------------- .../stram/plan/physical/StreamMapping.java | 4 +- .../com/datatorrent/stram/LocalityTest.java | 2 +- .../datatorrent/stram/OutputUnifiedTest.java | 4 +- .../com/datatorrent/stram/PartitioningTest.java | 2 +- .../com/datatorrent/stram/StreamCodecTest.java | 9 +-- .../stram/StreamingContainerManagerTest.java | 28 +++---- .../stram/plan/physical/PhysicalPlanTest.java | 81 ++++++++++---------- 7 files changed, 62 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index b5e357e..81d6d44 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -293,7 +293,7 @@ public class StreamMapping implements java.io.Serializable } } } - if (!separateUnifiers && ((pks == null || pks.mask == 0) || lastSingle)) { + if (!separateUnifiers && lastSingle) { if (finalUnifier == null) { finalUnifier = createUnifier(streamMeta, plan); } @@ -328,7 +328,7 @@ public class StreamMapping implements java.io.Serializable } // add new inputs for (PTOutput out : doperUnifierSources) { - addInput(unifier, out, pks); + addInput(unifier, out, (pks == null) || (pks.mask == 0) ? null : pks); } } } else { http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java b/engine/src/test/java/com/datatorrent/stram/LocalityTest.java index 71c17d4..2d69e42 100644 --- a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LocalityTest.java @@ -73,7 +73,7 @@ public class LocalityTest dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers); StreamingContainerManager scm = new StreamingContainerManager(dag); - Assert.assertEquals("number required containers", 7, scm.containerStartRequests.size()); + Assert.assertEquals("number required containers", 6, scm.containerStartRequests.size()); ResourceRequestHandler rr = new ResourceRequestHandler(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java index 2c4812a..12e4ba7 100644 --- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java +++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java @@ -79,7 +79,7 @@ public class OutputUnifiedTest StreamingContainerManager scm = new StreamingContainerManager(dag); PhysicalPlan physicalPlan = scm.getPhysicalPlan(); List<PTContainer> containers = physicalPlan.getContainers(); - Assert.assertEquals("Number of containers", 6, containers.size()); + Assert.assertEquals("Number of containers", 5, containers.size()); assignContainers(scm, containers); @@ -141,7 +141,7 @@ public class OutputUnifiedTest StreamingContainerManager scm = new StreamingContainerManager(dag); PhysicalPlan physicalPlan = scm.getPhysicalPlan(); List<PTContainer> containers = physicalPlan.getContainers(); - Assert.assertEquals("Number of containers", 6, containers.size()); + Assert.assertEquals("Number of containers", 5, containers.size()); assignContainers(scm, containers); http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index 9eef586..ecbeeb6 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -298,7 +298,7 @@ public class PartitioningTest for (PTOperator oper : partitions) { containers.add(oper.getContainer()); } - Assert.assertTrue("Number of containers are 5", 5 == lc.dnmgr.getPhysicalPlan().getContainers().size()); + Assert.assertTrue("Number of containers are 4", 4 == lc.dnmgr.getPhysicalPlan().getContainers().size()); PTOperator splitPartition = partitions.get(0); PartitionLoadWatch.put(splitPartition, 1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index cebeade..4ff9e51 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -413,8 +413,7 @@ public class StreamCodecTest List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs; for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) { String id = operator.getName() + " " + otdi.portName; - Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1); - checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan); + Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 0); } } } @@ -494,7 +493,7 @@ public class StreamCodecTest PhysicalPlan plan = dnm.getPhysicalPlan(); List<PTContainer> containers = plan.getContainers(); - Assert.assertEquals("number containers", 5, containers.size()); + Assert.assertEquals("number containers", 4, containers.size()); for (int i = 0; i < containers.size(); ++i) { StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1)); @@ -537,12 +536,12 @@ public class StreamCodecTest String id = operator.getName() + " " + idi.portName; Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1); checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan); + checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan); } List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs; for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) { String id = operator.getName() + " " + otdi.portName; - Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1); - checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan); + Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 0); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 11b938c..39bfbf2 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -266,7 +266,7 @@ public class StreamingContainerManagerTest // // ,---> node2----, // | | - // node1---+---> node2----+--->unifier--->node3 + // node1---+---> node2----+---> unifier | node3 // | | // '---> node2----' // @@ -288,7 +288,7 @@ public class StreamingContainerManagerTest StreamingContainerManager dnm = new StreamingContainerManager(dag); PhysicalPlan plan = dnm.getPhysicalPlan(); - Assert.assertEquals("number containers", 6, plan.getContainers().size()); + Assert.assertEquals("number containers", 5, plan.getContainers().size()); List<StreamingContainerAgent> containerAgents = Lists.newArrayList(); for (int i = 0; i < plan.getContainers().size(); i++) { containerAgents.add(assignContainer(dnm, "container" + (i + 1))); @@ -321,11 +321,8 @@ public class StreamingContainerManagerTest Assert.assertEquals("number stream codecs for " + nidi, 1, nidi.streamCodecs.size()); } - // unifier - List<PTOperator> o2Unifiers = plan.getMergeOperators(dag.getMeta(node2)); - Assert.assertEquals("number unifiers", 1, o2Unifiers.size()); - List<OperatorDeployInfo> cUnifier = getDeployInfo(dnm.getContainerAgent(o2Unifiers.get(0).getContainer().getExternalId())); - Assert.assertEquals("number operators " + cUnifier, 1, cUnifier.size()); + List<OperatorDeployInfo> cUnifier = getDeployInfo(dnm.getContainerAgent(plan.getOperators(dag.getMeta(node3)).get(0).getContainer().getExternalId())); + Assert.assertEquals("number operators " + cUnifier, 2, cUnifier.size()); OperatorDeployInfo mergeNodeDI = getNodeDeployInfo(cUnifier, dag.getMeta(node2).getMeta(node2.outport1).getUnifierMeta()); Assert.assertNotNull("unifier for " + node2, mergeNodeDI); @@ -361,7 +358,7 @@ public class StreamingContainerManagerTest // node3 container c = plan.getOperators(dag.getMeta(node3)).get(0).getContainer(); List<OperatorDeployInfo> cmerge = getDeployInfo(dnm.getContainerAgent(c.getExternalId())); - Assert.assertEquals("number operators " + cmerge, 1, cmerge.size()); + Assert.assertEquals("number operators " + cmerge, 2, cmerge.size()); OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge, dag.getMeta(node3)); Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI); @@ -764,15 +761,12 @@ public class StreamingContainerManagerTest PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0); MockContainer mc4 = mockContainers.get(o3p1.getContainer()); MockOperatorStats o3p1mos = mc4.stats(o3p1.getId()); - o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); - mc4.sendHeartbeat(); - // unifier - PTOperator unifier = physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0); - MockContainer mc5 = mockContainers.get(unifier.getContainer()); - MockOperatorStats unifierp1mos = mc5.stats(unifier.getId()); + MockOperatorStats unifierp1mos = mc4.stats(o3p1.upstreamMerge.values().iterator().next().getId()); unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); - mc5.sendHeartbeat(); + + o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc4.sendHeartbeat(); o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN); mc1.sendHeartbeat(); @@ -781,7 +775,7 @@ public class StreamingContainerManagerTest scm.monitorHeartbeat(); // committedWindowId updated in next cycle Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); scm.processEvents(); - Assert.assertEquals("containers at committedWindowId=1", 5, physicalPlan.getContainers().size()); + Assert.assertEquals("containers at committedWindowId=1", 4, physicalPlan.getContainers().size()); // checkpoint window 2 o1p1mos.checkpointWindowId(2); @@ -794,10 +788,10 @@ public class StreamingContainerManagerTest o2p2mos.currentWindowId(2).checkpointWindowId(2); o3p1mos.currentWindowId(2).checkpointWindowId(2); unifierp1mos.currentWindowId(2).checkpointWindowId(2); + mc2.sendHeartbeat(); mc3.sendHeartbeat(); mc4.sendHeartbeat(); - mc5.sendHeartbeat(); scm.monitorHeartbeat(); // Operators are shutdown when both operators reach window Id 2 http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index 8d19640..e426194 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -263,10 +263,7 @@ public class PhysicalPlanTest } Collection<PTOperator> unifiers = plan.getMergeOperators(partitionedMeta); - Assert.assertEquals("number unifiers " + partitionedMeta, 1, unifiers.size()); - PTOperator unifier = unifiers.iterator().next(); - Assert.assertNotNull("unifier container " + unifier, unifier.getContainer()); - Assert.assertEquals("unifier inputs " + unifier, partitioned.partitionKeys.length, unifier.inputs.size()); + Assert.assertEquals("number unifiers " + partitionedMeta, 0, unifiers.size()); } @Test @@ -414,7 +411,8 @@ public class PhysicalPlanTest Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); expUndeploy.add(o2p1); - expUndeploy.addAll(plan.getMergeOperators(o2Meta)); + + expUndeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values()); // verify load update generates expected events per configuration @@ -446,7 +444,8 @@ public class PhysicalPlanTest Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); expDeploy.addAll(plan.getOperators(o2Meta)); expDeploy.remove(o2p2); - expDeploy.addAll(plan.getMergeOperators(o2Meta)); + + expDeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values()); Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy); Assert.assertEquals("Count of storage requests", 2, ctx.backupRequests); @@ -513,6 +512,7 @@ public class PhysicalPlanTest Set<PTOperator> expDeploy = Sets.newHashSet(o1Partitions.get(1)); expDeploy.addAll(plan.getMergeOperators(dag.getMeta(o1))); expDeploy.addAll(expUndeploy); + expDeploy.add(o1p1.getOutputs().get(0).sinks.get(0).target); Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy); Assert.assertEquals("deploy", expDeploy, ctx.deploy); @@ -565,7 +565,8 @@ public class PhysicalPlanTest Collection<PTOperator> unifiers = plan.getMergeOperators(node2Meta); Assert.assertEquals("unifiers " + node2Meta, 0, unifiers.size()); - Collection<PTOperator> o3unifiers = plan.getMergeOperators(o3Meta); + Collection<PTOperator> o3unifiers = plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values(); + Assert.assertEquals("unifiers " + o3Meta, 1, o3unifiers.size()); PTOperator o3unifier = o3unifiers.iterator().next(); Assert.assertEquals("unifier inputs " + o3unifier, 8, o3unifier.getInputs().size()); @@ -573,7 +574,7 @@ public class PhysicalPlanTest Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); expUndeploy.addAll(n2Instances); expUndeploy.addAll(plan.getOperators(o3Meta)); - expUndeploy.addAll(plan.getMergeOperators(o3Meta)); + expUndeploy.addAll(o3unifiers); // verify load update generates expected events per configuration Assert.assertEquals("stats handlers " + po, 1, po.statsListeners.size()); @@ -629,10 +630,12 @@ public class PhysicalPlanTest Assert.assertEquals("" + ctx.undeploy, expUndeploy, ctx.undeploy); + o3unifiers = plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values(); + Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); expDeploy.addAll(plan.getOperators(node2Meta)); expDeploy.addAll(plan.getOperators(o3Meta)); - expDeploy.addAll(plan.getMergeOperators(o3Meta)); + expDeploy.addAll(o3unifiers); Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy); for (PTOperator oper : ctx.deploy) { @@ -667,9 +670,9 @@ public class PhysicalPlanTest PTOperator p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target; Assert.assertSame("", p1Doper.getOperatorMeta(), o1Meta.getMeta(o1.output).getUnifierMeta()); Assert.assertTrue("unifier ", p1Doper.isUnifier()); + Assert.assertEquals("Unifiers " + o1Meta, 1, o1p1.getOutputs().get(0).sinks.size()); - Collection<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta); - Assert.assertEquals("unifiers " + o1Meta, 1, o1Unifiers.size()); + Collection<PTOperator> o1Unifiers = new ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values()); StatsListener l = o1p1.statsListeners.get(0); Assert.assertTrue("stats handlers " + o1p1.statsListeners, l instanceof PartitioningTest.PartitionLoadWatch); @@ -684,7 +687,8 @@ public class PhysicalPlanTest List<PTOperator> o1NewPartitions = plan.getOperators(o1Meta); Assert.assertEquals("partitions " + o1NewPartitions, 1, o1NewPartitions.size()); - List<PTOperator> o1NewUnifiers = plan.getMergeOperators(o1Meta); + List<PTOperator> o1NewUnifiers = new ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values()); + Assert.assertEquals("unifiers " + o1Meta, 0, o1NewUnifiers.size()); p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target; Assert.assertTrue("", p1Doper.getOperatorMeta() == dag.getMeta(o2)); @@ -700,7 +704,8 @@ public class PhysicalPlanTest Assert.assertEquals("partition scaling triggered", 1, ctx.events.size()); ctx.events.remove(0).run(); - o1NewUnifiers = plan.getMergeOperators(o1Meta); + o1NewUnifiers.addAll(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values()); + Assert.assertEquals("unifiers " + o1Meta, 1, o1NewUnifiers.size()); Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, o1NewUnifiers.get(0).recoveryCheckpoint.windowId); } @@ -938,7 +943,7 @@ public class PhysicalPlanTest GenericTestOperator single = dag.addOperator("single", GenericTestOperator.class); dag.addStream("partitionedParallel_outport1", partitionedParallel.outport1, single.inport1); - int maxContainers = 7; + int maxContainers = 6; dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers); dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext()); @@ -1017,12 +1022,12 @@ public class PhysicalPlanTest GenericTestOperator o5single = dag.addOperator("o5single", GenericTestOperator.class); dag.addStream("o4outport1", o4.outport1, o5single.inport1); - int maxContainers = 5; + int maxContainers = 4; dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers); dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext()); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); - Assert.assertEquals("number of containers", 5, plan.getContainers().size()); + Assert.assertEquals("number of containers", 4, plan.getContainers().size()); PTContainer container1 = plan.getContainers().get(0); Assert.assertEquals("number operators " + container1, 1, container1.getOperators().size()); @@ -1049,25 +1054,24 @@ public class PhysicalPlanTest } } - // container 4: merge operator for o4 - Collection<PTOperator> o4Unifiers = plan.getMergeOperators(o4Meta); - Assert.assertEquals("unifier " + o4Meta + ": " + o4Unifiers, 1, o4Unifiers.size()); + // container 4: Unifier for o4 & O5 PTContainer container4 = plan.getContainers().get(3); - Assert.assertEquals("number operators " + container4, 1, container4.getOperators().size()); - Assert.assertEquals("operators " + container4, o4Meta.getMeta(o4.outport1).getUnifierMeta(), container4.getOperators().get(0).getOperatorMeta()); - Assert.assertTrue("unifier " + o4, container4.getOperators().get(0).isUnifier()); - Assert.assertEquals("unifier inputs" + container4.getOperators().get(0).getInputs(), 2, container4.getOperators().get(0).getInputs().size()); - Assert.assertEquals("unifier outputs" + container4.getOperators().get(0).getOutputs(), 1, container4.getOperators().get(0).getOutputs().size()); - // container 5: o5 taking input from o4 unifier + PTOperator ptOperatorO5 = plan.getOperators(dag.getMeta(o5single)).get(0); + PTOperator unifier = ptOperatorO5.upstreamMerge.values().iterator().next(); + + Assert.assertEquals("number operators " + container4, 2, container4.getOperators().size()); + Assert.assertEquals("operators " + container4, o4Meta.getMeta(o4.outport1).getUnifierMeta(), unifier.getOperatorMeta()); + + Assert.assertEquals("unifier inputs" + unifier.getInputs(), 2, unifier.getInputs().size()); + Assert.assertEquals("unifier outputs" + unifier.getOutputs(), 1, unifier.getOutputs().size()); + OperatorMeta o5Meta = dag.getMeta(o5single); - PTContainer container5 = plan.getContainers().get(4); - Assert.assertEquals("number operators " + container5, 1, container5.getOperators().size()); - Assert.assertEquals("operators " + container5, o5Meta, container5.getOperators().get(0).getOperatorMeta()); + Assert.assertEquals("operators " + container4, o5Meta, ptOperatorO5.getOperatorMeta()); List<PTOperator> o5Instances = plan.getOperators(o5Meta); Assert.assertEquals("" + o5Instances, 1, o5Instances.size()); - Assert.assertEquals("inputs" + container5.getOperators().get(0).getInputs(), 1, container5.getOperators().get(0).getInputs().size()); - Assert.assertEquals("inputs" + container5.getOperators().get(0).getInputs(), container4.getOperators().get(0), container5.getOperators().get(0).getInputs().get(0).source.source); + Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), 1, ptOperatorO5.getInputs().size()); + Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), unifier, ptOperatorO5.getInputs().get(0).source.source); // verify partitioner was called for parallel partition Assert.assertNotNull("partitioner called " + o3_1, o3_1.partitions); @@ -1214,8 +1218,8 @@ public class PhysicalPlanTest for (PTOperator ptOperator : plan.getOperators(o2Meta)) { expDeploy.addAll(ptOperator.upstreamMerge.values()); } - // from 3 to 2 the containers decrease from 5 to 4, but from 2 to 1 the container remains same because single unifier are not inline with single operator partition - Assert.assertEquals("number of containers", 4, plan.getContainers().size()); + + Assert.assertEquals("number of containers", 4 - i, plan.getContainers().size()); Assert.assertEquals("number of operators", 2 - i, plan.getOperators(o2Meta).size()); Assert.assertEquals("undeployed operators " + ctx.undeploy, expUndeploy, ctx.undeploy); Assert.assertEquals("deployed operators " + ctx.deploy, expDeploy, ctx.deploy); @@ -1550,19 +1554,16 @@ public class PhysicalPlanTest dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); PhysicalPlan plan = new PhysicalPlan(dag, ctx); - Assert.assertEquals("number of containers", 5, plan.getContainers().size()); + Assert.assertEquals("number of containers", 4, plan.getContainers().size()); List<PTOperator> o1ops = plan.getOperators(o1Meta); Assert.assertEquals("number of o1 operators", 3, o1ops.size()); List<PTOperator> o2ops = plan.getOperators(o2Meta); Assert.assertEquals("number of o2 operators", 1, o2ops.size()); - - List<PTOperator> uops = plan.getMergeOperators(o1Meta); - Set<PTOperator> expUndeploy = Sets.newLinkedHashSet(); expUndeploy.addAll(plan.getOperators(o2Meta)); - expUndeploy.addAll(uops); + expUndeploy.add(plan.getOperators(o2Meta).get(0).upstreamMerge.values().iterator().next()); for (int i = 0; i < 2; ++i) { PartitioningTest.PartitionLoadWatch.put(o1ops.get(i), 1); @@ -1572,7 +1573,7 @@ public class PhysicalPlanTest ctx.backupRequests = 0; ctx.events.remove(0).run(); - Assert.assertEquals("number of containers", 7, plan.getContainers().size()); + Assert.assertEquals("number of containers", 6, plan.getContainers().size()); Assert.assertEquals("undeployed opertors", expUndeploy, ctx.undeploy); } @@ -2221,7 +2222,7 @@ public class PhysicalPlanTest dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.addStream("o2.outport1", o2.outport1, o3.inport1); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); - Assert.assertEquals("number of containers", 10, plan.getContainers().size()); + Assert.assertEquals("number of containers", 9, plan.getContainers().size()); } @Test @@ -2241,6 +2242,6 @@ public class PhysicalPlanTest dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.addStream("o2.outport1", o2.outport1, o3.inport1); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); - Assert.assertEquals("number of containers", 8, plan.getContainers().size()); + Assert.assertEquals("number of containers", 7, plan.getContainers().size()); } }
