Repository: apex-core Updated Branches: refs/heads/master cfe9cefed -> 8829286d1
APEXCORE-703 Window processing timeout for finished/undeployed container. During an operator shutdown mark it as INACTIVE to exclude it from the blocked operators check. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8829286d Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8829286d Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8829286d Branch: refs/heads/master Commit: 8829286d12c755b9678498183b2ba052519a73c2 Parents: cfe9cef Author: Vlad Rozov <[email protected]> Authored: Sun Apr 16 09:34:09 2017 -0700 Committer: Vlad Rozov <[email protected]> Committed: Sat Apr 22 08:03:22 2017 -0700 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 1 + .../stram/plan/physical/PTOperator.java | 2 +- .../stram/plan/physical/PhysicalPlan.java | 2 +- .../stram/StreamingContainerManagerTest.java | 108 ++++++++++++++++++- 4 files changed, 109 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index d029b16..8d99dc1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1378,6 +1378,7 @@ public class StreamingContainerManager implements PlanContext } deactivatedOpers.add(oper); } + oper.setState(State.INACTIVE); sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); // record operator stop event http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java index 471dca2..84f6a5a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java @@ -497,7 +497,7 @@ public class PTOperator implements java.io.Serializable @Override public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).append("state", state).toString(); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index ecc010c..ab2a3ae 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1440,7 +1440,7 @@ public class PhysicalPlan implements Serializable void removePTOperator(PTOperator oper) { - LOG.debug("Removing operator " + oper); + LOG.debug("Removing operator {}", oper); // per partition merge operators if (!oper.upstreamMerge.isEmpty()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 53f18f9..3f2c20b 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -369,6 +369,103 @@ public class StreamingContainerManagerTest Assert.assertEquals("sourcePortName " + node3DI, mergeNodeDI.outputs.get(0).portName, node3In.sourcePortName); } + private static void shutdownOperator(StreamingContainerManager scm, PTOperator p1, PTOperator p2) + { + assignContainer(scm, "c1"); + assignContainer(scm, "c2"); + + ContainerHeartbeat c1hb = new ContainerHeartbeat(); + c1hb.setContainerStats(new ContainerStats(p1.getContainer().getExternalId())); + scm.processHeartbeat(c1hb); + + ContainerHeartbeat c2hb = new ContainerHeartbeat(); + c2hb.setContainerStats(new ContainerStats(p2.getContainer().getExternalId())); + scm.processHeartbeat(c2hb); + + OperatorHeartbeat o1hb = new OperatorHeartbeat(); + c1hb.getContainerStats().addNodeStats(o1hb); + o1hb.setNodeId(p1.getId()); + o1hb.setState(DeployState.ACTIVE); + OperatorStats o1stats = new OperatorStats(); + o1hb.getOperatorStatsContainer().add(o1stats); + o1stats.checkpoint = new Checkpoint(2, 0, 0); + o1stats.windowId = 3; + scm.processHeartbeat(c1hb); + Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState()); + + OperatorHeartbeat o2hb = new OperatorHeartbeat(); + c2hb.getContainerStats().addNodeStats(o2hb); + o2hb.setNodeId(p2.getId()); + o2hb.setState(DeployState.ACTIVE); + OperatorStats o2stats = new OperatorStats(); + o2stats.checkpoint = new Checkpoint(2, 0, 0); + o2stats.windowId = 3; + scm.processHeartbeat(c2hb); + Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState()); + Assert.assertEquals(PTOperator.State.ACTIVE, p2.getState()); + + o1hb.setState(DeployState.SHUTDOWN); + o1stats.checkpoint = new Checkpoint(4, 0,0); + o1stats.windowId = 5; + scm.processHeartbeat(c1hb); + Assert.assertEquals(PTOperator.State.INACTIVE, p1.getState()); + } + + @Test + public void testShutdownOperatorTimeout() throws Exception + { + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + + dag.addStream("s1", o1.outport1, o2.inport1); + + dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 50); + dag.setAttribute(OperatorContext.TIMEOUT_WINDOW_COUNT, 1); + + StreamingContainerManager scm = new StreamingContainerManager(dag); + + PhysicalPlan plan = scm.getPhysicalPlan(); + + PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0); + PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0); + + shutdownOperator(scm, p1, p2); + + scm.monitorHeartbeat(false); + Assert.assertTrue(scm.containerStopRequests.isEmpty()); + Thread.sleep(100); + scm.monitorHeartbeat(false); + Assert.assertFalse(scm.containerStopRequests.containsKey(p1.getContainer().getExternalId())); + Assert.assertTrue(scm.containerStopRequests.containsKey(p2.getContainer().getExternalId())); + } + + @Test + public void testShutdownOperatorRecovery() throws Exception + { + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + + dag.addStream("s1", o1.outport1, o2.inport1); + + dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + + StreamingContainerManager scm = new StreamingContainerManager(dag); + scm.containerStartRequests.poll(); + scm.containerStartRequests.poll(); + + PhysicalPlan plan = scm.getPhysicalPlan(); + + PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0); + PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0); + + shutdownOperator(scm, p1, p2); + + scm.scheduleContainerRestart(p1.getContainer().getExternalId()); + ContainerStartRequest dr = scm.containerStartRequests.poll(); + Assert.assertTrue(dr.container.getOperators().contains(p1)); + } + @Test public void testRecoveryOrder() throws Exception { @@ -717,7 +814,9 @@ public class StreamingContainerManagerTest ce.getKey().bufferServerAddress = null; } - PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0); + List<PTOperator> o1p = physicalPlan.getOperators(dag.getMeta(o1)); + Assert.assertEquals("o1 partitions", 1, o1p.size()); + PTOperator o1p1 = o1p.get(0); MockContainer mc1 = mockContainers.get(o1p1.getContainer()); MockOperatorStats o1p1mos = mc1.stats(o1p1.getId()); o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); @@ -729,7 +828,7 @@ public class StreamingContainerManagerTest o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); mc2.sendHeartbeat(); - Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size()); + Assert.assertEquals("o2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size()); PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1); MockContainer mc3 = mockContainers.get(o2p2.getContainer()); @@ -737,6 +836,7 @@ public class StreamingContainerManagerTest o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); mc3.sendHeartbeat(); + Assert.assertEquals("o3 partitions", 1, physicalPlan.getOperators(dag.getMeta(o3)).size()); PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0); MockContainer mc4 = mockContainers.get(o3p1.getContainer()); MockOperatorStats o3p1mos = mc4.stats(o3p1.getId()); @@ -749,6 +849,10 @@ public class StreamingContainerManagerTest o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN); mc1.sendHeartbeat(); + o1p = physicalPlan.getOperators(dag.getMeta(o1)); + Assert.assertEquals("o1 partitions", 1, o1p.size()); + Assert.assertEquals("o1p1 present", o1p1, o1p.get(0)); + Assert.assertEquals("input operator state", PTOperator.State.INACTIVE, o1p1.getState()); scm.monitorHeartbeat(false); Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId()); scm.monitorHeartbeat(false); // committedWindowId updated in next cycle
