Repository: incubator-apex-core Updated Branches: refs/heads/master a93fbf3e4 -> 7c84e05e1
APEXCORE-201 changed the way latency is calculated and fixed the problem when latency is stalled when an operator falls behind too many windows Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6e6d6b1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6e6d6b1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6e6d6b1a Branch: refs/heads/master Commit: 6e6d6b1a23d053a622184c28759ac83503c14d7d Parents: 9e30cf8 Author: David Yan <[email protected]> Authored: Tue Dec 29 17:35:04 2015 -0800 Committer: David Yan <[email protected]> Committed: Thu Mar 10 18:02:35 2016 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 217 +++++++++---------- .../datatorrent/stram/engine/GenericNode.java | 5 +- .../com/datatorrent/stram/engine/InputNode.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 18 +- .../stram/plan/physical/PhysicalPlan.java | 9 + .../stram/StreamingContainerManagerTest.java | 84 +++++++ 6 files changed, 215 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index df3bfc4..1e76010 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -171,6 +171,7 @@ public class StreamingContainerManager implements PlanContext private RecoveryHandler recoveryHandler; // window id to node id to end window stats private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>>(); + private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new ConcurrentHashMap<>(); private long committedWindowId; // (operator id, port name) to timestamp private final Map<Pair<Integer, String>, Long> operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap(); @@ -230,7 +231,24 @@ public class StreamingContainerManager implements PlanContext public static class CriticalPathInfo { long latency; - LinkedList<Integer> path = new LinkedList<Integer>(); + final LinkedList<Integer> path; + + public CriticalPathInfo() + { + this.path = new LinkedList<>(); + } + + private CriticalPathInfo(long latency, LinkedList<Integer> path) + { + this.latency = latency; + this.path = path; + } + + @Override + protected Object clone() throws CloneNotSupportedException + { + return new CriticalPathInfo(this.latency, (LinkedList<Integer>)this.path.clone()); + } } private static class SetOperatorProperty implements Recoverable @@ -766,6 +784,7 @@ public class StreamingContainerManager implements PlanContext Set<Integer> endWindowStatsOperators = endWindowStatsMap.keySet(); aggregateMetrics(windowId, endWindowStatsMap); + criticalPathInfo = findCriticalPath(); if (allCurrentOperators.containsAll(endWindowStatsOperators)) { if (endWindowStatsMap.size() < numOperators) { @@ -778,22 +797,6 @@ public class StreamingContainerManager implements PlanContext } } else { - // collected data from all operators for this window id. start latency calculation - List<OperatorMeta> rootOperatorMetas = plan.getLogicalPlan().getRootOperators(); - Set<PTOperator> endWindowStatsVisited = new HashSet<PTOperator>(); - Set<PTOperator> leafOperators = new HashSet<PTOperator>(); - for (OperatorMeta root : rootOperatorMetas) { - List<PTOperator> rootOperators = plan.getOperators(root); - for (PTOperator rootOperator : rootOperators) { - // DFS for visiting the operators for latency calculation - LOG.debug("Calculating latency starting from operator {}", rootOperator.getId()); - calculateLatency(rootOperator, endWindowStatsMap, endWindowStatsVisited, leafOperators); - } - } - CriticalPathInfo cpi = new CriticalPathInfo(); - //LOG.debug("Finding critical path..."); - cpi.latency = findCriticalPath(endWindowStatsMap, leafOperators, cpi.path); - criticalPathInfo = cpi; endWindowStatsOperatorMap.remove(windowId); currentEndWindowStatsWindowId = windowId; } @@ -919,106 +922,41 @@ public class StreamingContainerManager implements PlanContext return logicalMetrics.get(operatorName); } - private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators) + private CriticalPathInfo findCriticalPath() { - endWindowStatsVisited.add(oper); - OperatorStatus operatorStatus = oper.stats; - - EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId()); - if (endWindowStats == null) { - LOG.info("End window stats is null for operator {}, probably a new operator after partitioning", oper); - return; - } - - // find the maximum end window emit time from all input ports - long upstreamMaxEmitTimestamp = -1; - PTOperator upstreamMaxEmitTimestampOperator = null; - for (PTOperator.PTInput input : oper.getInputs()) { - if (null != input.source.source) { - PTOperator upstreamOp = input.source.source; - EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId()); - if (upstreamEndWindowStats == null) { - LOG.info("End window stats is null for operator {}", oper); - return; - } - long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp; - MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId()); - if (rpcLatency != null) { - adjustedEndWindowEmitTimestamp += rpcLatency.getAvg(); - } - if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) { - upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp; - upstreamMaxEmitTimestampOperator = upstreamOp; - } - } - } - - if (upstreamMaxEmitTimestamp > 0) { - long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp; - MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId()); - if (rpcLatency != null) { - adjustedEndWindowEmitTimestamp += rpcLatency.getAvg(); - } - if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) { - LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper); - operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp); - } else { - operatorStatus.latencyMA.add(0); - if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS) { - LOG.warn("Latency calculation for this operator may not be correct because upstream end window timestamp is greater than this operator's end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks are in sync in your cluster. You can also try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).", - upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper, this.vars.rpcLatencyCompensationSamples); - lastLatencyWarningTime = System.currentTimeMillis(); - } - } - } - - if (oper.getOutputs().isEmpty()) { - // it is a leaf operator - leafOperators.add(oper); - } - else { - for (PTOperator.PTOutput output : oper.getOutputs()) { - for (PTOperator.PTInput input : output.sinks) { - if (input.target != null) { - PTOperator downStreamOp = input.target; - if (!endWindowStatsVisited.contains(downStreamOp)) { - calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited, leafOperators); - } - } - } + CriticalPathInfo result = null; + List<PTOperator> leafOperators = plan.getLeafOperators(); + Map<PTOperator, CriticalPathInfo> cache = new HashMap<>(); + for (PTOperator leafOperator : leafOperators) { + CriticalPathInfo cpi = findCriticalPathHelper(leafOperator, cache); + if (result == null || result.latency < cpi.latency) { + result = cpi; } } + return result; } - /* - * returns cumulative latency - */ - private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath) - { - long maxEndWindowTimestamp = 0; - PTOperator maxOperator = null; - for (PTOperator operator : operators) { - EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId()); - if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) { - maxEndWindowTimestamp = endWindowStats.emitTimestamp; - maxOperator = operator; - } - } - if (maxOperator == null) { - return 0; - } - criticalPath.addFirst(maxOperator.getId()); - OperatorStatus operatorStatus = maxOperator.stats; - operators.clear(); - if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) { - return operatorStatus.latencyMA.getAvg(); + private CriticalPathInfo findCriticalPathHelper(PTOperator operator, Map<PTOperator, CriticalPathInfo> cache) + { + CriticalPathInfo cpi = cache.get(operator); + if (cpi != null) { + return cpi; } - for (PTOperator.PTInput input : maxOperator.getInputs()) { - if (null != input.source.source && !input.delay) { - operators.add(input.source.source); + PTOperator slowestUpstreamOperator = slowestUpstreamOp.get(operator); + if (slowestUpstreamOperator != null) { + cpi = findCriticalPathHelper(slowestUpstreamOperator, cache); + try { + cpi = (CriticalPathInfo)cpi.clone(); + } catch (CloneNotSupportedException ex) { + throw new RuntimeException(); } + } else { + cpi = new CriticalPathInfo(); } - return operatorStatus.latencyMA.getAvg() + findCriticalPath(endWindowStatsMap, operators, criticalPath); + cpi.latency += operator.stats.getLatencyMA(); + cpi.path.addLast(operator.getId()); + cache.put(operator, cpi); + return cpi; } public int processEvents() @@ -1338,7 +1276,7 @@ public class StreamingContainerManager implements PlanContext switch (oper.getState()) { case ACTIVE: // Commented out the warning below because it's expected when the operator does something - // quickly and goes out of commission, it will report SHUTDOWN correcly whereas this code + // quickly and goes out of commission, it will report SHUTDOWN correctly whereas this code // is incorrectly expecting ACTIVE to be reported. //LOG.warn("status out of sync {} expected {} remote {}", oper, oper.getState(), ds); // operator expected active, check remote status @@ -1363,12 +1301,14 @@ public class StreamingContainerManager implements PlanContext deactivatedOpers.add(oper); } sca.undeployOpers.add(oper.getId()); + slowestUpstreamOp.remove(oper); // record operator stop event recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); break; case FAILED: processOperatorFailure(oper); sca.undeployOpers.add(oper.getId()); + slowestUpstreamOp.remove(oper); recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); break; case ACTIVE: @@ -1386,6 +1326,7 @@ public class StreamingContainerManager implements PlanContext else { // operator is currently deployed, request undeploy sca.undeployOpers.add(oper.getId()); + slowestUpstreamOp.remove(oper); } break; case PENDING_DEPLOY: @@ -1408,6 +1349,7 @@ public class StreamingContainerManager implements PlanContext if (ds != null) { // operator was removed and needs to be undeployed from container sca.undeployOpers.add(oper.getId()); + slowestUpstreamOp.remove(oper); recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); } } @@ -1606,8 +1548,11 @@ public class StreamingContainerManager implements PlanContext tuplesProcessed += s.tupleCount; endWindowStats.dequeueTimestamps.put(s.id, s.endWindowTimestamp); - Pair<Integer, String> operatorPortName = new Pair<Integer, String>(oper.getId(), s.id); - long lastEndWindowTimestamp = operatorPortLastEndWindowTimestamps.containsKey(operatorPortName) ? operatorPortLastEndWindowTimestamps.get(operatorPortName) : lastStatsTimestamp; + Pair<Integer, String> operatorPortName = new Pair<>(oper.getId(), s.id); + Long lastEndWindowTimestamp = operatorPortLastEndWindowTimestamps.get(operatorPortName); + if (lastEndWindowTimestamp == null) { + lastEndWindowTimestamp = lastStatsTimestamp; + } long portElapsedMillis = Math.max(s.endWindowTimestamp - lastEndWindowTimestamp, 0); //LOG.debug("=== PROCESSED TUPLE COUNT for {}: {}, {}, {}, {}", operatorPortName, s.tupleCount, portElapsedMillis, operatorPortLastEndWindowTimestamps.get(operatorPortName), lastStatsTimestamp); ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis); @@ -1647,13 +1592,16 @@ public class StreamingContainerManager implements PlanContext ps.recordingId = s.recordingId; tuplesEmitted += s.tupleCount; - Pair<Integer, String> operatorPortName = new Pair<Integer, String>(oper.getId(), s.id); - - long lastEndWindowTimestamp = operatorPortLastEndWindowTimestamps.containsKey(operatorPortName) ? operatorPortLastEndWindowTimestamps.get(operatorPortName) : lastStatsTimestamp; + Pair<Integer, String> operatorPortName = new Pair<>(oper.getId(), s.id); + Long lastEndWindowTimestamp = operatorPortLastEndWindowTimestamps.get(operatorPortName); + if (lastEndWindowTimestamp == null) { + lastEndWindowTimestamp = lastStatsTimestamp; + } long portElapsedMillis = Math.max(s.endWindowTimestamp - lastEndWindowTimestamp, 0); //LOG.debug("=== EMITTED TUPLE COUNT for {}: {}, {}, {}, {}", operatorPortName, s.tupleCount, portElapsedMillis, operatorPortLastEndWindowTimestamps.get(operatorPortName), lastStatsTimestamp); ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis); ps.bufferServerBytesPMSMA.add(s.bufferServerBytes, portElapsedMillis); + operatorPortLastEndWindowTimestamps.put(operatorPortName, s.endWindowTimestamp); if (maxEndWindowTimestamp < s.endWindowTimestamp) { maxEndWindowTimestamp = s.endWindowTimestamp; @@ -1701,6 +1649,45 @@ public class StreamingContainerManager implements PlanContext } endWindowStatsMap.put(shb.getNodeId(), endWindowStats); + if (!oper.getInputs().isEmpty()) { + long latency = Long.MAX_VALUE; + long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp; + MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId()); + if (rpcLatency != null) { + adjustedEndWindowEmitTimestamp += rpcLatency.getAvg(); + } + PTOperator slowestUpstream = null; + for (PTInput input : oper.getInputs()) { + PTOperator upstreamOp = input.source.source; + if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + continue; + } + EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId()); + long portLatency; + if (ews == null) { + // This is when the operator is likely to be behind too many windows. We need to give an estimate for + // latency at this point, by looking at the number of windows behind + int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS); + portLatency = (upstreamOp.stats.currentWindowId.get() - oper.stats.currentWindowId.get()) * widthMillis; + } else { + MovingAverageLong upstreamRPCLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId()); + portLatency = adjustedEndWindowEmitTimestamp - ews.emitTimestamp; + if (upstreamRPCLatency != null) { + portLatency -= upstreamRPCLatency.getAvg(); + } + } + if (portLatency < 0) { + portLatency = 0; + } + if (latency > portLatency) { + latency = portLatency; + slowestUpstream = upstreamOp; + } + } + status.latencyMA.add(latency); + slowestUpstreamOp.put(oper, slowestUpstream); + } + Set<Integer> allCurrentOperators = plan.getAllOperators().keySet(); int numOperators = plan.getAllOperators().size(); if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && endWindowStatsMap.size() == numOperators) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 1ccec31..5acb15d 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -138,13 +138,12 @@ public class GenericNode extends Node<Operator> */ protected void processEndWindow(Tuple endWindowTuple) { - endWindowEmitTime = System.currentTimeMillis(); - if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) { insideWindow = false; operator.endWindow(); applicationWindowCount = 0; } + endWindowEmitTime = System.currentTimeMillis(); if (endWindowTuple == null) { emitEndWindow(); @@ -636,8 +635,8 @@ public class GenericNode extends Node<Operator> * TODO: as using a listener callback */ if (insideWindow && !shutdown) { - endWindowEmitTime = System.currentTimeMillis(); operator.endWindow(); + endWindowEmitTime = System.currentTimeMillis(); if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) { applicationWindowCount = 0; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java index 92a61f0..0bd04ef 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java @@ -122,13 +122,13 @@ public class InputNode extends Node<InputOperator> break; case END_WINDOW: - endWindowEmitTime = System.currentTimeMillis(); insideStreamingWindow = false; if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) { insideApplicationWindow = false; operator.endWindow(); applicationWindowCount = 0; } + endWindowEmitTime = System.currentTimeMillis(); for (int i = sinks.length; i-- > 0;) { sinks[i].put(t); @@ -218,8 +218,8 @@ public class InputNode extends Node<InputOperator> } if (insideApplicationWindow) { - endWindowEmitTime = System.currentTimeMillis(); operator.endWindow(); + endWindowEmitTime = System.currentTimeMillis(); if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) { applicationWindowCount = 0; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 6d7ebe1..173298b 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -158,6 +158,7 @@ public class LogicalPlan implements Serializable, DAG private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>(); public final Map<String, ModuleMeta> modules = new LinkedHashMap<>(); private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>(); + private final List<OperatorMeta> leafOperators = new ArrayList<>(); private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>(); @@ -484,7 +485,9 @@ public class LogicalPlan implements Serializable, DAG sinks.add(portMeta); om.inputStreams.put(portMeta, this); rootOperators.remove(portMeta.operatorMeta); - + if (this.source != null && !(this.source.getOperatorMeta().getOperator() instanceof Operator.DelayOperator)) { + leafOperators.remove(this.source.getOperatorMeta()); + } return this; } @@ -512,6 +515,10 @@ public class LogicalPlan implements Serializable, DAG this.sinks.clear(); if (this.source != null) { this.source.getOperatorMeta().outputStreams.remove(this.source); + if (this.source.getOperatorMeta().outputStreams.isEmpty() && + !(this.source.getOperatorMeta().getOperator() instanceof Operator.DelayOperator)) { + leafOperators.remove(this.source.getOperatorMeta()); + } } this.source = null; streams.remove(this.id); @@ -1142,6 +1149,7 @@ public class LogicalPlan implements Serializable, DAG } OperatorMeta decl = new OperatorMeta(name, operator); rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator + leafOperators.add(decl); // will be removed when a sink is added to an output port for this operator operators.put(name, decl); return operator; } @@ -1332,6 +1340,7 @@ public class LogicalPlan implements Serializable, DAG } this.operators.remove(om.getName()); rootOperators.remove(om); + leafOperators.remove(om); } @Override @@ -1517,6 +1526,11 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableList(this.rootOperators); } + public List<OperatorMeta> getLeafOperators() + { + return Collections.unmodifiableList(this.leafOperators); + } + public Collection<OperatorMeta> getAllOperators() { return Collections.unmodifiableCollection(this.operators.values()); @@ -1923,7 +1937,7 @@ public class LogicalPlan implements Serializable, DAG * @see <a href="http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm">http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm</a> * * @param om - * @param cycles + * @param ctx */ public void findStronglyConnected(OperatorMeta om, ValidationContext ctx) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index c696224..7d24633 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1356,6 +1356,15 @@ public class PhysicalPlan implements Serializable return this.logicalToPTOperator.get(logicalOperator).getAllOperators(); } + public List<PTOperator> getLeafOperators() + { + List<PTOperator> operators = new ArrayList<>(); + for (OperatorMeta opMeta : dag.getLeafOperators()) { + operators.addAll(getAllOperators(opMeta)); + } + return operators; + } + public boolean hasMapping(OperatorMeta om) { return this.logicalToPTOperator.containsKey(om); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 8fc957b..73c2ae0 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; @@ -77,6 +78,7 @@ import com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.datatorrent.stram.tuple.Tuple; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONObject; @@ -1010,4 +1012,86 @@ public class StreamingContainerManagerTest String msg = TestMetricTransport.messages.get(0); Assert.assertTrue(msg.startsWith("xyz:")); } + + public static class HighLatencyTestOperator extends GenericTestOperator + { + private long firstWindowMillis; + private long windowWidthMillis; + private long currentWindowId; + private long latency; + + @Override + public void setup(OperatorContext context) + { + firstWindowMillis = System.currentTimeMillis(); + // this is an approximation because there is no way to get to the actual value in the DAG + + windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + } + + @Override + public void endWindow() + { + long sleepMillis = latency - + (System.currentTimeMillis() - WindowGenerator.getWindowMillis(currentWindowId, firstWindowMillis, windowWidthMillis)); + + if (sleepMillis > 0) { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException ex) { + // move on + } + } + } + + public void setLatency(long latency) + { + this.latency = latency; + } + + } + + @Test + public void testLatency() throws Exception + { + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + HighLatencyTestOperator o3 = dag.addOperator("o3", HighLatencyTestOperator.class); + GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class); + long latency = 5000; // 5 seconds + o3.setLatency(latency); + dag.addStream("o1.outport", o1.outport, o2.inport1, o3.inport1); + dag.addStream("o2.outport1", o2.outport1, o4.inport1); + dag.addStream("o3.outport1", o3.outport1, o4.inport2); + dag.setAttribute(Context.DAGContext.STATS_MAX_ALLOWABLE_WINDOWS_LAG, 2); // 1 second + StramLocalCluster lc = new StramLocalCluster(dag); + StreamingContainerManager dnmgr = lc.dnmgr; + lc.runAsync(); + Thread.sleep(10000); + LogicalOperatorInfo o1Info = dnmgr.getLogicalOperatorInfo("o1"); + LogicalOperatorInfo o2Info = dnmgr.getLogicalOperatorInfo("o2"); + LogicalOperatorInfo o3Info = dnmgr.getLogicalOperatorInfo("o3"); + LogicalOperatorInfo o4Info = dnmgr.getLogicalOperatorInfo("o4"); + + Assert.assertEquals("Input operator latency must be zero", 0, o1Info.latencyMA); + Assert.assertTrue("Latency must be greater than or equal to zero", o2Info.latencyMA >= 0); + Assert.assertTrue("Actual latency must be greater than the artificially introduced latency", + o3Info.latencyMA > latency); + Assert.assertTrue("Latency must be greater than or equal to zero", o4Info.latencyMA >= 0); + StreamingContainerManager.CriticalPathInfo criticalPathInfo = dnmgr.getCriticalPathInfo(); + Assert.assertArrayEquals("Critical Path must be the path in the DAG that includes the HighLatencyTestOperator", + new Integer[]{o1Info.partitions.iterator().next(), + o3Info.partitions.iterator().next(), + o4Info.partitions.iterator().next()}, + criticalPathInfo.path.toArray()); + Assert.assertTrue("Whole DAG latency must be greater than the artificially introduced latency", + criticalPathInfo.latency > latency); + lc.shutdown(); + } }
