Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 010ff2540 -> 4806e5fab
APEX-274 #resolve Fixing null pointer exception in removing unifier PTOperators from Physical Plan. Updated unit test to check this scenario Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d6da85c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d6da85c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d6da85c1 Branch: refs/heads/release-3.2 Commit: d6da85c1434413e768412c7fcabaf112992f2cbe Parents: 076979a Author: ishark <[email protected]> Authored: Sun Nov 22 22:16:22 2015 -0800 Committer: ishark <[email protected]> Committed: Sun Nov 22 22:20:33 2015 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 2 +- .../stram/plan/physical/PhysicalPlan.java | 13 +++++--- .../stram/StreamingContainerManagerTest.java | 34 +++++++++++++++++++- 3 files changed, 43 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index ca724db..29c6a2c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2396,7 +2396,7 @@ public class StreamingContainerManager implements PlanContext } } } - if (physicalOperators.size() > 0) { + if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != null) { loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue(); loi.counters = latestLogicalCounters.get(operator.getName()); loi.autoMetrics = latestLogicalMetrics.get(operator.getName()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index f560a35..7858ea0 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1117,10 +1117,15 @@ public class PhysicalPlan implements Serializable } } PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta); - List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions); - copyPartitions.remove(p); - removePartition(p, currentMapping); - currentMapping.partitions = copyPartitions; + if (currentMapping != null) { + List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions); + copyPartitions.remove(p); + removePartition(p, currentMapping); + currentMapping.partitions = copyPartitions; + } else { + // remove the operator + removePTOperator(p); + } // remove orphaned downstream operators for (PTOperator dop : downstreamOpers) { if (dop.inputs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d6da85c1/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 2884323..6d4d1a4 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -717,8 +717,12 @@ public class StreamingContainerManagerTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + dag.addStream("stream1", o1.outport1, o2.inport1); + dag.addStream("stream2", o2.outport1, o3.inport1); + dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); StreamingContainerManager scm = new StreamingContainerManager(dag); PhysicalPlan physicalPlan = scm.getPhysicalPlan(); @@ -727,6 +731,7 @@ public class StreamingContainerManagerTest MockContainer mc = new MockContainer(scm, c); mockContainers.put(c, mc); } + // deploy all containers for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) { ce.getValue().deploy(); @@ -748,6 +753,27 @@ public class StreamingContainerManagerTest o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); mc2.sendHeartbeat(); + Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size()); + + PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1); + MockContainer mc3 = mockContainers.get(o2p2.getContainer()); + MockOperatorStats o2p2mos = mc3.stats(o2p2.getId()); + o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc3.sendHeartbeat(); + + PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0); + MockContainer mc4 = mockContainers.get(o3p1.getContainer()); + MockOperatorStats o3p1mos = mc4.stats(o3p1.getId()); + o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc4.sendHeartbeat(); + + // unifier + PTOperator unifier = physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0); + MockContainer mc5 = mockContainers.get(unifier.getContainer()); + MockOperatorStats unifierp1mos = mc5.stats(unifier.getId()); + unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc5.sendHeartbeat(); + o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN); mc1.sendHeartbeat(); scm.monitorHeartbeat(); @@ -755,7 +781,7 @@ public class StreamingContainerManagerTest scm.monitorHeartbeat(); // committedWindowId updated in next cycle Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); scm.processEvents(); - Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size()); + Assert.assertEquals("containers at committedWindowId=1", 5, physicalPlan.getContainers().size()); // checkpoint window 2 o1p1mos.checkpointWindowId(2); @@ -765,7 +791,13 @@ public class StreamingContainerManagerTest Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); o2p1mos.currentWindowId(2).checkpointWindowId(2); + o2p2mos.currentWindowId(2).checkpointWindowId(2); + o3p1mos.currentWindowId(2).checkpointWindowId(2); + unifierp1mos.currentWindowId(2).checkpointWindowId(2); mc2.sendHeartbeat(); + mc3.sendHeartbeat(); + mc4.sendHeartbeat(); + mc5.sendHeartbeat(); scm.monitorHeartbeat(); // Operators are shutdown when both operators reach window Id 2
