Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 010ff2540 -> 4806e5fab


APEX-274 #resolve Fixing null pointer exception in removing unifier PTOperators 
from Physical Plan. Updated unit test to check this scenario


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d6da85c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d6da85c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d6da85c1

Branch: refs/heads/release-3.2
Commit: d6da85c1434413e768412c7fcabaf112992f2cbe
Parents: 076979a
Author: ishark <[email protected]>
Authored: Sun Nov 22 22:16:22 2015 -0800
Committer: ishark <[email protected]>
Committed: Sun Nov 22 22:20:33 2015 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 13 +++++---
 .../stram/StreamingContainerManagerTest.java    | 34 +++++++++++++++++++-
 3 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ca724db..29c6a2c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2396,7 +2396,7 @@ public class StreamingContainerManager implements 
PlanContext
         }
       }
     }
-    if (physicalOperators.size() > 0) {
+    if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != 
null) {
       loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue();
       loi.counters = latestLogicalCounters.get(operator.getName());
       loi.autoMetrics = latestLogicalMetrics.get(operator.getName());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f560a35..7858ea0 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1117,10 +1117,15 @@ public class PhysicalPlan implements Serializable
       }
     }
     PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
-    List<PTOperator> copyPartitions = 
Lists.newArrayList(currentMapping.partitions);
-    copyPartitions.remove(p);
-    removePartition(p, currentMapping);
-    currentMapping.partitions = copyPartitions;
+    if (currentMapping != null) {
+      List<PTOperator> copyPartitions = 
Lists.newArrayList(currentMapping.partitions);
+      copyPartitions.remove(p);
+      removePartition(p, currentMapping);
+      currentMapping.partitions = copyPartitions;
+    } else {
+      // remove the operator
+      removePTOperator(p);
+    }
     // remove orphaned downstream operators
     for (PTOperator dop : downstreamOpers) {
       if (dop.inputs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 2884323..6d4d1a4 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -717,8 +717,12 @@ public class StreamingContainerManagerTest
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
     dag.addStream("stream1", o1.outport1, o2.inport1);
+    dag.addStream("stream2", o2.outport1, o3.inport1);
 
+    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
     PhysicalPlan physicalPlan = scm.getPhysicalPlan();
@@ -727,6 +731,7 @@ public class StreamingContainerManagerTest
       MockContainer mc = new MockContainer(scm, c);
       mockContainers.put(c, mc);
     }
+
     // deploy all containers
     for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) 
{
       ce.getValue().deploy();
@@ -748,6 +753,27 @@ public class StreamingContainerManagerTest
     
o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
     mc2.sendHeartbeat();
 
+    Assert.assertEquals("2 partitions", 2, 
physicalPlan.getOperators(dag.getMeta(o2)).size());
+
+    PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
+    MockContainer mc3 = mockContainers.get(o2p2.getContainer());
+    MockOperatorStats o2p2mos = mc3.stats(o2p2.getId());
+    
o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc3.sendHeartbeat();
+
+    PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
+    MockContainer mc4 = mockContainers.get(o3p1.getContainer());
+    MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
+    
o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc4.sendHeartbeat();
+
+    // unifier
+    PTOperator unifier = 
physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0);
+    MockContainer mc5 = mockContainers.get(unifier.getContainer());
+    MockOperatorStats unifierp1mos = mc5.stats(unifier.getId());
+    
unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc5.sendHeartbeat();
+
     o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
     mc1.sendHeartbeat();
     scm.monitorHeartbeat();
@@ -755,7 +781,7 @@ public class StreamingContainerManagerTest
     scm.monitorHeartbeat(); // committedWindowId updated in next cycle
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
     scm.processEvents();
-    Assert.assertEquals("containers at committedWindowId=1", 2, 
physicalPlan.getContainers().size());
+    Assert.assertEquals("containers at committedWindowId=1", 5, 
physicalPlan.getContainers().size());
 
     // checkpoint window 2
     o1p1mos.checkpointWindowId(2);
@@ -765,7 +791,13 @@ public class StreamingContainerManagerTest
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
 
     o2p1mos.currentWindowId(2).checkpointWindowId(2);
+    o2p2mos.currentWindowId(2).checkpointWindowId(2);
+    o3p1mos.currentWindowId(2).checkpointWindowId(2);
+    unifierp1mos.currentWindowId(2).checkpointWindowId(2);
     mc2.sendHeartbeat();
+    mc3.sendHeartbeat();
+    mc4.sendHeartbeat();
+    mc5.sendHeartbeat();
     scm.monitorHeartbeat();
 
     // Operators are shutdown when both operators reach window Id 2

Reply via email to