Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 [created] f28d8d742
APEX-56 SPOI-4380 #resolve Remove terminated operators from plan after window is committed. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/76faf869 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/76faf869 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/76faf869 Branch: refs/heads/devel-3 Commit: 76faf869506d004fc2f7d470f5bb89d681b470df Parents: 3c5b88c Author: thomas <[email protected]> Authored: Thu Aug 20 11:06:33 2015 -0700 Committer: thomas <[email protected]> Committed: Thu Aug 20 11:06:33 2015 -0700 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 46 +++++++---- .../stram/plan/physical/PhysicalPlan.java | 32 +++++--- .../com/datatorrent/stram/MockContainer.java | 2 +- .../com/datatorrent/stram/StreamCodecTest.java | 35 +-------- .../stram/StreamingContainerManagerTest.java | 81 +++++++++++++++++++- 5 files changed, 134 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 6e0f3f5..eed2948 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -154,6 +154,7 @@ public class StreamingContainerManager implements PlanContext private long lastResourceRequest = 0; private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>(); private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>(); + private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>(); private CriticalPathInfo criticalPathInfo; private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap(); private final AtomicBoolean deployChangeInProgress = new AtomicBoolean(); @@ -1003,6 +1004,26 @@ public class StreamingContainerManager implements PlanContext } reportStats.remove(o); } + + if (!this.shutdownOperators.isEmpty()) { + synchronized (this.shutdownOperators) { + Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next(); + if (windowAndOpers.getKey().longValue() > this.committedWindowId) { + // wait until window is committed + continue; + } else { + LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(windowAndOpers.getKey()), windowAndOpers.getValue()); + for (PTOperator oper : windowAndOpers.getValue()) { + plan.removeTerminatedPartition(oper); + } + it.remove(); + } + } + } + } + if (!eventQueue.isEmpty()) { for (PTOperator oper : plan.getAllOperators().values()) { if (oper.getState() != PTOperator.State.ACTIVE) { @@ -1274,20 +1295,19 @@ public class StreamingContainerManager implements PlanContext else { switch (ds) { case SHUTDOWN: - // remove the operator from the plan - Runnable r = new Runnable() - { - @Override - public void run() - { - if (oper.getInputs().isEmpty()) { - LOG.info("Removing IDLE operator from plan {}", oper); - plan.removeIdlePartition(oper); - } + // schedule operator deactivation against the windowId + // will be processed once window is committed and all dependent operators completed processing + long windowId = oper.stats.currentWindowId.get(); + if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) { + windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId; + } + LOG.debug("Operator {} deactivated at window {}", oper, windowId); + synchronized (this.shutdownOperators) { + Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId); + if (deactivatedOpers == null) { + this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper)); } - - }; - dispatch(r); + } sca.undeployOpers.add(oper.getId()); // record operator stop event recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 5b90c04..a57a248 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -780,7 +780,8 @@ public class PhysicalPlan implements Serializable partitioner.partitioned(mainPC.operatorIdToPartition); } - private void updateStreamMappings(PMapping m) { + private void updateStreamMappings(PMapping m) + { for (Map.Entry<OutputPortMeta, StreamMeta> opm : m.logicalOperator.getOutputStreams().entrySet()) { StreamMapping ug = m.outputStreams.get(opm.getKey()); if (ug == null) { @@ -789,7 +790,6 @@ public class PhysicalPlan implements Serializable } LOG.debug("update stream mapping for {} {}", opm.getKey().getOperatorMeta(), opm.getKey().getPortName()); ug.setSources(m.partitions); - //ug.redoMapping(); } for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) { @@ -847,7 +847,6 @@ public class PhysicalPlan implements Serializable } LOG.debug("update upstream stream mapping for {} {}", sourceMapping.logicalOperator, ipm.getValue().getSource().getPortName()); ug.setSources(sourceMapping.partitions); - //ug.redoMapping(); } } @@ -990,18 +989,30 @@ public class PhysicalPlan implements Serializable } /** - * Remove a partition that was reported as idle by the execution layer. - * Since the end stream tuple is propagated to the downstream operators, - * there is no need to undeploy/redeploy them as part of this operation. + * Remove a partition that was reported as terminated by the execution layer. + * Recursively removes all downstream operators with no remaining input. * @param p */ - public void removeIdlePartition(PTOperator p) + public void removeTerminatedPartition(PTOperator p) { + // keep track of downstream operators for cascading remove + Set<PTOperator> downstreamOpers = new HashSet<>(p.outputs.size()); + for (PTOutput out : p.outputs) { + for (PTInput sinkIn : out.sinks) { + downstreamOpers.add(sinkIn.target); + } + } PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta); List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions); copyPartitions.remove(p); removePartition(p, currentMapping); currentMapping.partitions = copyPartitions; + // remove orphaned downstream operators + for (PTOperator dop : downstreamOpers) { + if (dop.inputs.isEmpty()) { + removeTerminatedPartition(dop); + } + } deployChanges(); } @@ -1012,8 +1023,8 @@ public class PhysicalPlan implements Serializable * @param oper * @return */ - private void removePartition(PTOperator oper, PMapping operatorMapping) { - + private void removePartition(PTOperator oper, PMapping operatorMapping) + { // remove any parallel partition for (PTOutput out : oper.outputs) { // copy list as it is modified by recursive remove @@ -1137,7 +1148,8 @@ public class PhysicalPlan implements Serializable return inputPortList; } - void removePTOperator(PTOperator oper) { + void removePTOperator(PTOperator oper) + { LOG.debug("Removing operator " + oper); // per partition merge operators http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/MockContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/MockContainer.java b/engine/src/test/java/com/datatorrent/stram/MockContainer.java index 7a6ba64..c0b704f 100644 --- a/engine/src/test/java/com/datatorrent/stram/MockContainer.java +++ b/engine/src/test/java/com/datatorrent/stram/MockContainer.java @@ -91,7 +91,7 @@ public class MockContainer for (Map.Entry<Integer, MockOperatorStats> oe : this.stats.entrySet()) { OperatorHeartbeat ohb = new OperatorHeartbeat(); ohb.setNodeId(oe.getKey()); - ohb.setState(OperatorHeartbeat.DeployState.ACTIVE); + ohb.setState(oe.getValue().deployState); OperatorStats lstats = new OperatorStats(); lstats.checkpoint = new Checkpoint(oe.getValue().checkpointWindowId, 0, 0); lstats.windowId = oe.getValue().currentWindowId; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 9726e65..d7a7fff 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1178,28 +1178,6 @@ public class StreamCodecTest return unifiers; } - private void checkNotSetStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id, - Integer streamCodecIdentifier) - { - StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier); - Assert.assertNotNull("stream codec null " + id, streamCodecInfo); - Assert.assertNull("stream codec object not null " + id, streamCodecInfo); - } - - private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id, - Integer streamCodecIdentifier, StreamCodec<?> streamCodec) - { - checkStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec, null); - } - - private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id, - Integer streamCodecIdentifier, StreamCodec<?> streamCodec, String className) - { - StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier); - Assert.assertNotNull("stream codec info null " + id, streamCodecInfo); - Assert.assertEquals("stream codec object " + id, streamCodec, streamCodecInfo); - } - private void checkPresentStreamCodec(LogicalPlan.OperatorMeta operatorMeta, Operator.InputPort<?> inputPort, Map<Integer, StreamCodec<?>> streamCodecs, String id, PhysicalPlan plan ) @@ -1277,17 +1255,6 @@ public class StreamCodecTest return otdi; } - private LogicalPlan.InputPortMeta getInputPortMeta(LogicalPlan.StreamMeta streamMeta, LogicalPlan.OperatorMeta operatorMeta) - { - LogicalPlan.InputPortMeta portMeta = null; - for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : operatorMeta.getInputStreams().entrySet()) { - if (entry.getValue() == streamMeta) { - portMeta = entry.getKey(); - } - } - return portMeta; - } - // For tests so that it doesn't trigger assignment of a new id public boolean isStrCodecPresent(StreamCodec<?> streamCodecInfo, PhysicalPlan plan) { @@ -1316,7 +1283,7 @@ public class StreamCodecTest public static class DefaultTestStreamCodec extends DefaultStatefulStreamCodec<Object> implements Serializable { - + private static final long serialVersionUID = 1L; } public static class DefaultCodecOperator extends GenericTestOperator http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index a238e3e..89f2878 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -31,7 +31,6 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; @@ -40,10 +39,10 @@ import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.api.Stats.OperatorStats.PortStats; import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.stram.MockContainer.MockOperatorStats; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; import com.datatorrent.stram.StreamingContainerManager.ContainerResource; import com.datatorrent.stram.api.AppDataSource; @@ -56,6 +55,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHe import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState; import com.datatorrent.stram.appdata.AppDataPushAgent; import com.datatorrent.stram.codec.DefaultStatefulStreamCodec; import com.datatorrent.stram.engine.*; @@ -72,12 +72,14 @@ import com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.datatorrent.stram.tuple.Tuple; + import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.eclipse.jetty.websocket.WebSocket; -public class StreamingContainerManagerTest { +public class StreamingContainerManagerTest +{ @Rule public TestMeta testMeta = new TestMeta(); @Test @@ -703,6 +705,74 @@ public class StreamingContainerManagerTest { Assert.assertEquals("type " + o1DeployInfo, OperatorDeployInfo.OperatorType.INPUT, o1DeployInfo.type); } + @Test + public void testOperatorShutdown() + { + LogicalPlan dag = new LogicalPlan(); + dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + dag.addStream("stream1", o1.outport1, o2.inport1); + + StreamingContainerManager scm = new StreamingContainerManager(dag); + + PhysicalPlan physicalPlan = scm.getPhysicalPlan(); + Map<PTContainer, MockContainer> mockContainers = new HashMap<>(); + for (PTContainer c : physicalPlan.getContainers()) { + MockContainer mc = new MockContainer(scm, c); + mockContainers.put(c, mc); + } + // deploy all containers + for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) { + ce.getValue().deploy(); + // skip buffer server purge in monitorHeartbeat + ce.getKey().bufferServerAddress = null; + } + + PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0); + MockContainer mc1 = mockContainers.get(o1p1.getContainer()); + MockOperatorStats o1p1mos = mc1.stats(o1p1.getId()); + o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc1.sendHeartbeat(); + + PTOperator o2p1 = physicalPlan.getOperators(dag.getMeta(o2)).get(0); + MockContainer mc2 = mockContainers.get(o2p1.getContainer()); + MockOperatorStats o2p1mos = mc2.stats(o2p1.getId()); + o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE); + mc2.sendHeartbeat(); + + o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN); + mc1.sendHeartbeat(); + scm.monitorHeartbeat(); + Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId()); + scm.monitorHeartbeat(); // committedWindowId updated in next cycle + Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); + scm.processEvents(); + Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size()); + + // checkpoint window 2 + o1p1mos.checkpointWindowId(2); + mc1.sendHeartbeat(); + scm.monitorHeartbeat(); + + o2p1mos.currentWindowId(2).checkpointWindowId(2); + mc2.sendHeartbeat(); + scm.monitorHeartbeat(); + Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); + scm.monitorHeartbeat(); // committedWindowId updated in next cycle + Assert.assertEquals("committedWindowId", 2, scm.getCommittedWindowId()); + Assert.assertEquals(1, o1p1.getContainer().getOperators().size()); + Assert.assertEquals(1, o2p1.getContainer().getOperators().size()); + Assert.assertEquals(2, physicalPlan.getContainers().size()); + + // call again as events are processed after committed window was updated + scm.processEvents(); + Assert.assertEquals(0, o1p1.getContainer().getOperators().size()); + Assert.assertEquals(0, o2p1.getContainer().getOperators().size()); + Assert.assertEquals(0, physicalPlan.getContainers().size()); + } private void testDownStreamPartition(Locality locality) throws Exception { @@ -738,7 +808,8 @@ public class StreamingContainerManagerTest { } @Test - public void testPhysicalPropertyUpdate() throws Exception{ + public void testPhysicalPropertyUpdate() throws Exception + { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); @@ -755,6 +826,7 @@ public class StreamingContainerManagerTest { Future<?> future = dnmgr.getPhysicalOperatorProperty(lc.getPlanOperators(dag.getMeta(o1)).get(0).getId(), "maxTuples", 10000); Object object = future.get(10000, TimeUnit.MILLISECONDS); Assert.assertNotNull(object); + @SuppressWarnings("unchecked") Map<String, Object> propertyValue = (Map<String, Object>)object; Assert.assertEquals(2,propertyValue.get("maxTuples")); lc.shutdown(); @@ -873,6 +945,7 @@ public class StreamingContainerManagerTest { pushAgent.pushData(); Thread.sleep(1000); Assert.assertTrue(messages.size() > 0); + pushAgent.close(); JSONObject message = messages.get(0); System.out.println("Got this message: " + message.toString(2)); Assert.assertEquals(topic, message.getString("topic"));
