Repository: apex-core
Updated Branches:
  refs/heads/master 86142d0f4 -> b102f59aa


APEXCORE-474 In M*1 case, deploy the unifier in the same container as 
downstream operator's container


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/b102f59a
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/b102f59a
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/b102f59a

Branch: refs/heads/master
Commit: b102f59aa03cad50b4655618c1e9bcb6bb2f41c9
Parents: 86142d0
Author: Sandesh Hegde <[email protected]>
Authored: Wed Sep 28 15:51:37 2016 -0700
Committer: Sandesh Hegde <[email protected]>
Committed: Fri Sep 30 16:49:58 2016 -0700

----------------------------------------------------------------------
 .../stram/plan/physical/StreamMapping.java      |  4 +-
 .../com/datatorrent/stram/LocalityTest.java     |  2 +-
 .../datatorrent/stram/OutputUnifiedTest.java    |  4 +-
 .../com/datatorrent/stram/PartitioningTest.java |  2 +-
 .../com/datatorrent/stram/StreamCodecTest.java  |  9 +--
 .../stram/StreamingContainerManagerTest.java    | 28 +++----
 .../stram/plan/physical/PhysicalPlanTest.java   | 81 ++++++++++----------
 7 files changed, 62 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index b5e357e..81d6d44 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -293,7 +293,7 @@ public class StreamMapping implements java.io.Serializable
               }
             }
           }
-          if (!separateUnifiers && ((pks == null || pks.mask == 0) || 
lastSingle)) {
+          if (!separateUnifiers && lastSingle) {
             if (finalUnifier == null) {
               finalUnifier = createUnifier(streamMeta, plan);
             }
@@ -328,7 +328,7 @@ public class StreamMapping implements java.io.Serializable
             }
             // add new inputs
             for (PTOutput out : doperUnifierSources) {
-              addInput(unifier, out, pks);
+              addInput(unifier, out, (pks == null) || (pks.mask == 0) ? null : 
pks);
             }
           }
         } else {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java 
b/engine/src/test/java/com/datatorrent/stram/LocalityTest.java
index 71c17d4..2d69e42 100644
--- a/engine/src/test/java/com/datatorrent/stram/LocalityTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LocalityTest.java
@@ -73,7 +73,7 @@ public class LocalityTest
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
-    Assert.assertEquals("number required containers", 7, 
scm.containerStartRequests.size());
+    Assert.assertEquals("number required containers", 6, 
scm.containerStartRequests.size());
 
     ResourceRequestHandler rr = new ResourceRequestHandler();
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java 
b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
index 2c4812a..12e4ba7 100644
--- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
@@ -79,7 +79,7 @@ public class OutputUnifiedTest
     StreamingContainerManager scm = new StreamingContainerManager(dag);
     PhysicalPlan physicalPlan = scm.getPhysicalPlan();
     List<PTContainer> containers = physicalPlan.getContainers();
-    Assert.assertEquals("Number of containers", 6, containers.size());
+    Assert.assertEquals("Number of containers", 5, containers.size());
 
     assignContainers(scm, containers);
 
@@ -141,7 +141,7 @@ public class OutputUnifiedTest
     StreamingContainerManager scm = new StreamingContainerManager(dag);
     PhysicalPlan physicalPlan = scm.getPhysicalPlan();
     List<PTContainer> containers = physicalPlan.getContainers();
-    Assert.assertEquals("Number of containers", 6, containers.size());
+    Assert.assertEquals("Number of containers", 5, containers.size());
 
     assignContainers(scm, containers);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java 
b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 9eef586..ecbeeb6 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -298,7 +298,7 @@ public class PartitioningTest
     for (PTOperator oper : partitions) {
       containers.add(oper.getContainer());
     }
-    Assert.assertTrue("Number of containers are 5", 5 == 
lc.dnmgr.getPhysicalPlan().getContainers().size());
+    Assert.assertTrue("Number of containers are 4", 4 == 
lc.dnmgr.getPhysicalPlan().getContainers().size());
 
     PTOperator splitPartition = partitions.get(0);
     PartitionLoadWatch.put(splitPartition, 1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index cebeade..4ff9e51 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -413,8 +413,7 @@ public class StreamCodecTest
           List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs;
           for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) {
             String id = operator.getName() + " " + otdi.portName;
-            Assert.assertEquals("number stream codecs " + id, 
otdi.streamCodecs.size(), 1);
-            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, 
id, plan);
+            Assert.assertEquals("number stream codecs " + id, 
otdi.streamCodecs.size(), 0);
           }
         }
       }
@@ -494,7 +493,7 @@ public class StreamCodecTest
     PhysicalPlan plan = dnm.getPhysicalPlan();
 
     List<PTContainer> containers = plan.getContainers();
-    Assert.assertEquals("number containers", 5, containers.size());
+    Assert.assertEquals("number containers", 4, containers.size());
 
     for (int i = 0; i < containers.size(); ++i) {
       StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 
1));
@@ -537,12 +536,12 @@ public class StreamCodecTest
             String id = operator.getName() + " " + idi.portName;
             Assert.assertEquals("number stream codecs " + id, 
idi.streamCodecs.size(), 1);
             checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, 
id, plan);
+            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, 
id, plan);
           }
           List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs;
           for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) {
             String id = operator.getName() + " " + otdi.portName;
-            Assert.assertEquals("number stream codecs " + id, 
otdi.streamCodecs.size(), 1);
-            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, 
id, plan);
+            Assert.assertEquals("number stream codecs " + id, 
otdi.streamCodecs.size(), 0);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 11b938c..39bfbf2 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -266,7 +266,7 @@ public class StreamingContainerManagerTest
     //
     //            ,---> node2----,
     //            |              |
-    //    node1---+---> node2----+--->unifier--->node3
+    //    node1---+---> node2----+---> unifier | node3
     //            |              |
     //            '---> node2----'
     //
@@ -288,7 +288,7 @@ public class StreamingContainerManagerTest
     StreamingContainerManager dnm = new StreamingContainerManager(dag);
     PhysicalPlan plan = dnm.getPhysicalPlan();
 
-    Assert.assertEquals("number containers", 6, plan.getContainers().size());
+    Assert.assertEquals("number containers", 5, plan.getContainers().size());
     List<StreamingContainerAgent> containerAgents = Lists.newArrayList();
     for (int i = 0; i < plan.getContainers().size(); i++) {
       containerAgents.add(assignContainer(dnm, "container" + (i + 1)));
@@ -321,11 +321,8 @@ public class StreamingContainerManagerTest
       Assert.assertEquals("number stream codecs for " + nidi, 1, 
nidi.streamCodecs.size());
     }
 
-    // unifier
-    List<PTOperator> o2Unifiers = plan.getMergeOperators(dag.getMeta(node2));
-    Assert.assertEquals("number unifiers", 1, o2Unifiers.size());
-    List<OperatorDeployInfo> cUnifier = 
getDeployInfo(dnm.getContainerAgent(o2Unifiers.get(0).getContainer().getExternalId()));
-    Assert.assertEquals("number operators " + cUnifier, 1, cUnifier.size());
+    List<OperatorDeployInfo> cUnifier = 
getDeployInfo(dnm.getContainerAgent(plan.getOperators(dag.getMeta(node3)).get(0).getContainer().getExternalId()));
+    Assert.assertEquals("number operators " + cUnifier, 2, cUnifier.size());
 
     OperatorDeployInfo mergeNodeDI = getNodeDeployInfo(cUnifier, 
dag.getMeta(node2).getMeta(node2.outport1).getUnifierMeta());
     Assert.assertNotNull("unifier for " + node2, mergeNodeDI);
@@ -361,7 +358,7 @@ public class StreamingContainerManagerTest
     // node3 container
     c = plan.getOperators(dag.getMeta(node3)).get(0).getContainer();
     List<OperatorDeployInfo> cmerge = 
getDeployInfo(dnm.getContainerAgent(c.getExternalId()));
-    Assert.assertEquals("number operators " + cmerge, 1, cmerge.size());
+    Assert.assertEquals("number operators " + cmerge, 2, cmerge.size());
 
     OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge,  
dag.getMeta(node3));
     Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI);
@@ -764,15 +761,12 @@ public class StreamingContainerManagerTest
     PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
     MockContainer mc4 = mockContainers.get(o3p1.getContainer());
     MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
-    
o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
-    mc4.sendHeartbeat();
 
-    // unifier
-    PTOperator unifier = 
physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0);
-    MockContainer mc5 = mockContainers.get(unifier.getContainer());
-    MockOperatorStats unifierp1mos = mc5.stats(unifier.getId());
+    MockOperatorStats unifierp1mos = 
mc4.stats(o3p1.upstreamMerge.values().iterator().next().getId());
     
unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
-    mc5.sendHeartbeat();
+
+    
o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc4.sendHeartbeat();
 
     o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
     mc1.sendHeartbeat();
@@ -781,7 +775,7 @@ public class StreamingContainerManagerTest
     scm.monitorHeartbeat(); // committedWindowId updated in next cycle
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
     scm.processEvents();
-    Assert.assertEquals("containers at committedWindowId=1", 5, 
physicalPlan.getContainers().size());
+    Assert.assertEquals("containers at committedWindowId=1", 4, 
physicalPlan.getContainers().size());
 
     // checkpoint window 2
     o1p1mos.checkpointWindowId(2);
@@ -794,10 +788,10 @@ public class StreamingContainerManagerTest
     o2p2mos.currentWindowId(2).checkpointWindowId(2);
     o3p1mos.currentWindowId(2).checkpointWindowId(2);
     unifierp1mos.currentWindowId(2).checkpointWindowId(2);
+
     mc2.sendHeartbeat();
     mc3.sendHeartbeat();
     mc4.sendHeartbeat();
-    mc5.sendHeartbeat();
     scm.monitorHeartbeat();
 
     // Operators are shutdown when both operators reach window Id 2

http://git-wip-us.apache.org/repos/asf/apex-core/blob/b102f59a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 8d19640..e426194 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -263,10 +263,7 @@ public class PhysicalPlanTest
     }
 
     Collection<PTOperator> unifiers = plan.getMergeOperators(partitionedMeta);
-    Assert.assertEquals("number unifiers " + partitionedMeta, 1, 
unifiers.size());
-    PTOperator unifier = unifiers.iterator().next();
-    Assert.assertNotNull("unifier container " + unifier, 
unifier.getContainer());
-    Assert.assertEquals("unifier inputs " + unifier, 
partitioned.partitionKeys.length, unifier.inputs.size());
+    Assert.assertEquals("number unifiers " + partitionedMeta, 0, 
unifiers.size());
   }
 
   @Test
@@ -414,7 +411,8 @@ public class PhysicalPlanTest
 
     Set<PTOperator> expUndeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
     expUndeploy.add(o2p1);
-    expUndeploy.addAll(plan.getMergeOperators(o2Meta));
+
+    
expUndeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values());
 
     // verify load update generates expected events per configuration
 
@@ -446,7 +444,8 @@ public class PhysicalPlanTest
     Set<PTOperator> expDeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
     expDeploy.addAll(plan.getOperators(o2Meta));
     expDeploy.remove(o2p2);
-    expDeploy.addAll(plan.getMergeOperators(o2Meta));
+
+    
expDeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values());
 
     Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy);
     Assert.assertEquals("Count of storage requests", 2, ctx.backupRequests);
@@ -513,6 +512,7 @@ public class PhysicalPlanTest
     Set<PTOperator> expDeploy = Sets.newHashSet(o1Partitions.get(1));
     expDeploy.addAll(plan.getMergeOperators(dag.getMeta(o1)));
     expDeploy.addAll(expUndeploy);
+    expDeploy.add(o1p1.getOutputs().get(0).sinks.get(0).target);
 
     Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
     Assert.assertEquals("deploy", expDeploy, ctx.deploy);
@@ -565,7 +565,8 @@ public class PhysicalPlanTest
     Collection<PTOperator> unifiers = plan.getMergeOperators(node2Meta);
     Assert.assertEquals("unifiers " + node2Meta, 0, unifiers.size());
 
-    Collection<PTOperator> o3unifiers = plan.getMergeOperators(o3Meta);
+    Collection<PTOperator> o3unifiers = 
plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values();
+
     Assert.assertEquals("unifiers " + o3Meta, 1, o3unifiers.size());
     PTOperator o3unifier = o3unifiers.iterator().next();
     Assert.assertEquals("unifier inputs " + o3unifier, 8, 
o3unifier.getInputs().size());
@@ -573,7 +574,7 @@ public class PhysicalPlanTest
     Set<PTOperator> expUndeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
     expUndeploy.addAll(n2Instances);
     expUndeploy.addAll(plan.getOperators(o3Meta));
-    expUndeploy.addAll(plan.getMergeOperators(o3Meta));
+    expUndeploy.addAll(o3unifiers);
 
     // verify load update generates expected events per configuration
     Assert.assertEquals("stats handlers " + po, 1, po.statsListeners.size());
@@ -629,10 +630,12 @@ public class PhysicalPlanTest
 
     Assert.assertEquals("" + ctx.undeploy, expUndeploy, ctx.undeploy);
 
+    o3unifiers = 
plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values();
+
     Set<PTOperator> expDeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
     expDeploy.addAll(plan.getOperators(node2Meta));
     expDeploy.addAll(plan.getOperators(o3Meta));
-    expDeploy.addAll(plan.getMergeOperators(o3Meta));
+    expDeploy.addAll(o3unifiers);
 
     Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy);
     for (PTOperator oper : ctx.deploy) {
@@ -667,9 +670,9 @@ public class PhysicalPlanTest
     PTOperator p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target;
     Assert.assertSame("", p1Doper.getOperatorMeta(), 
o1Meta.getMeta(o1.output).getUnifierMeta());
     Assert.assertTrue("unifier ", p1Doper.isUnifier());
+    Assert.assertEquals("Unifiers " + o1Meta, 1, 
o1p1.getOutputs().get(0).sinks.size());
 
-    Collection<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
-    Assert.assertEquals("unifiers " + o1Meta, 1, o1Unifiers.size());
+    Collection<PTOperator> o1Unifiers = new 
ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
 
     StatsListener l = o1p1.statsListeners.get(0);
     Assert.assertTrue("stats handlers " + o1p1.statsListeners, l instanceof 
PartitioningTest.PartitionLoadWatch);
@@ -684,7 +687,8 @@ public class PhysicalPlanTest
     List<PTOperator> o1NewPartitions = plan.getOperators(o1Meta);
     Assert.assertEquals("partitions " + o1NewPartitions, 1, 
o1NewPartitions.size());
 
-    List<PTOperator> o1NewUnifiers = plan.getMergeOperators(o1Meta);
+    List<PTOperator> o1NewUnifiers = new 
ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
+
     Assert.assertEquals("unifiers " + o1Meta, 0, o1NewUnifiers.size());
     p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target;
     Assert.assertTrue("", p1Doper.getOperatorMeta() == dag.getMeta(o2));
@@ -700,7 +704,8 @@ public class PhysicalPlanTest
     Assert.assertEquals("partition scaling triggered", 1, ctx.events.size());
     ctx.events.remove(0).run();
 
-    o1NewUnifiers = plan.getMergeOperators(o1Meta);
+    
o1NewUnifiers.addAll(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
+
     Assert.assertEquals("unifiers " + o1Meta, 1, o1NewUnifiers.size());
     Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, 
o1NewUnifiers.get(0).recoveryCheckpoint.windowId);
   }
@@ -938,7 +943,7 @@ public class PhysicalPlanTest
     GenericTestOperator single = dag.addOperator("single", 
GenericTestOperator.class);
     dag.addStream("partitionedParallel_outport1", 
partitionedParallel.outport1, single.inport1);
 
-    int maxContainers = 7;
+    int maxContainers = 6;
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
 
@@ -1017,12 +1022,12 @@ public class PhysicalPlanTest
     GenericTestOperator o5single = dag.addOperator("o5single", 
GenericTestOperator.class);
     dag.addStream("o4outport1", o4.outport1, o5single.inport1);
 
-    int maxContainers = 5;
+    int maxContainers = 4;
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
 
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
-    Assert.assertEquals("number of containers", 5, 
plan.getContainers().size());
+    Assert.assertEquals("number of containers", 4, 
plan.getContainers().size());
 
     PTContainer container1 = plan.getContainers().get(0);
     Assert.assertEquals("number operators " + container1, 1, 
container1.getOperators().size());
@@ -1049,25 +1054,24 @@ public class PhysicalPlanTest
       }
     }
 
-    // container 4: merge operator for o4
-    Collection<PTOperator> o4Unifiers = plan.getMergeOperators(o4Meta);
-    Assert.assertEquals("unifier " + o4Meta + ": " + o4Unifiers, 1, 
o4Unifiers.size());
+    // container 4: Unifier for o4 & O5
     PTContainer container4 = plan.getContainers().get(3);
-    Assert.assertEquals("number operators " + container4, 1, 
container4.getOperators().size());
-    Assert.assertEquals("operators " + container4, 
o4Meta.getMeta(o4.outport1).getUnifierMeta(), 
container4.getOperators().get(0).getOperatorMeta());
-    Assert.assertTrue("unifier " + o4, 
container4.getOperators().get(0).isUnifier());
-    Assert.assertEquals("unifier inputs" + 
container4.getOperators().get(0).getInputs(), 2, 
container4.getOperators().get(0).getInputs().size());
-    Assert.assertEquals("unifier outputs" + 
container4.getOperators().get(0).getOutputs(), 1, 
container4.getOperators().get(0).getOutputs().size());
 
-    // container 5: o5 taking input from o4 unifier
+    PTOperator ptOperatorO5 = plan.getOperators(dag.getMeta(o5single)).get(0);
+    PTOperator unifier = ptOperatorO5.upstreamMerge.values().iterator().next();
+
+    Assert.assertEquals("number operators " + container4, 2, 
container4.getOperators().size());
+    Assert.assertEquals("operators " + container4, 
o4Meta.getMeta(o4.outport1).getUnifierMeta(), unifier.getOperatorMeta());
+
+    Assert.assertEquals("unifier inputs" + unifier.getInputs(), 2, 
unifier.getInputs().size());
+    Assert.assertEquals("unifier outputs" + unifier.getOutputs(), 1, 
unifier.getOutputs().size());
+
     OperatorMeta o5Meta = dag.getMeta(o5single);
-    PTContainer container5 = plan.getContainers().get(4);
-    Assert.assertEquals("number operators " + container5, 1, 
container5.getOperators().size());
-    Assert.assertEquals("operators " + container5, o5Meta, 
container5.getOperators().get(0).getOperatorMeta());
+    Assert.assertEquals("operators " + container4, o5Meta, 
ptOperatorO5.getOperatorMeta());
     List<PTOperator> o5Instances = plan.getOperators(o5Meta);
     Assert.assertEquals("" + o5Instances, 1, o5Instances.size());
-    Assert.assertEquals("inputs" + 
container5.getOperators().get(0).getInputs(), 1, 
container5.getOperators().get(0).getInputs().size());
-    Assert.assertEquals("inputs" + 
container5.getOperators().get(0).getInputs(), container4.getOperators().get(0), 
container5.getOperators().get(0).getInputs().get(0).source.source);
+    Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), 1, 
ptOperatorO5.getInputs().size());
+    Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), unifier, 
ptOperatorO5.getInputs().get(0).source.source);
 
     // verify partitioner was called for parallel partition
     Assert.assertNotNull("partitioner called " + o3_1, o3_1.partitions);
@@ -1214,8 +1218,8 @@ public class PhysicalPlanTest
       for (PTOperator ptOperator : plan.getOperators(o2Meta)) {
         expDeploy.addAll(ptOperator.upstreamMerge.values());
       }
-      // from 3 to 2 the containers decrease from 5 to 4, but from 2 to 1 the 
container remains same because single unifier are not inline with single 
operator partition
-      Assert.assertEquals("number of containers", 4, 
plan.getContainers().size());
+
+      Assert.assertEquals("number of containers", 4 - i, 
plan.getContainers().size());
       Assert.assertEquals("number of operators", 2 - i, 
plan.getOperators(o2Meta).size());
       Assert.assertEquals("undeployed operators " + ctx.undeploy, expUndeploy, 
ctx.undeploy);
       Assert.assertEquals("deployed operators " + ctx.deploy, expDeploy, 
ctx.deploy);
@@ -1550,19 +1554,16 @@ public class PhysicalPlanTest
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
 
     PhysicalPlan plan = new PhysicalPlan(dag, ctx);
-    Assert.assertEquals("number of containers", 5, 
plan.getContainers().size());
+    Assert.assertEquals("number of containers", 4, 
plan.getContainers().size());
 
     List<PTOperator> o1ops = plan.getOperators(o1Meta);
     Assert.assertEquals("number of o1 operators", 3, o1ops.size());
 
     List<PTOperator> o2ops = plan.getOperators(o2Meta);
     Assert.assertEquals("number of o2 operators", 1, o2ops.size());
-
-    List<PTOperator> uops = plan.getMergeOperators(o1Meta);
-
     Set<PTOperator> expUndeploy = Sets.newLinkedHashSet();
     expUndeploy.addAll(plan.getOperators(o2Meta));
-    expUndeploy.addAll(uops);
+    
expUndeploy.add(plan.getOperators(o2Meta).get(0).upstreamMerge.values().iterator().next());
 
     for (int i = 0; i < 2; ++i) {
       PartitioningTest.PartitionLoadWatch.put(o1ops.get(i), 1);
@@ -1572,7 +1573,7 @@ public class PhysicalPlanTest
     ctx.backupRequests = 0;
     ctx.events.remove(0).run();
 
-    Assert.assertEquals("number of containers", 7, 
plan.getContainers().size());
+    Assert.assertEquals("number of containers", 6, 
plan.getContainers().size());
 
     Assert.assertEquals("undeployed opertors", expUndeploy, ctx.undeploy);
   }
@@ -2221,7 +2222,7 @@ public class PhysicalPlanTest
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1);
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
-    Assert.assertEquals("number of containers", 10, 
plan.getContainers().size());
+    Assert.assertEquals("number of containers", 9, 
plan.getContainers().size());
   }
 
   @Test
@@ -2241,6 +2242,6 @@ public class PhysicalPlanTest
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1);
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
-    Assert.assertEquals("number of containers", 8, 
plan.getContainers().size());
+    Assert.assertEquals("number of containers", 7, 
plan.getContainers().size());
   }
 }

Reply via email to