Repository: incubator-apex-core
Updated Branches:
  refs/heads/master a93fbf3e4 -> 7c84e05e1


APEXCORE-201 changed the way latency is calculated and fixed the problem when 
latency is stalled when an operator falls behind too many windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6e6d6b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6e6d6b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6e6d6b1a

Branch: refs/heads/master
Commit: 6e6d6b1a23d053a622184c28759ac83503c14d7d
Parents: 9e30cf8
Author: David Yan <[email protected]>
Authored: Tue Dec 29 17:35:04 2015 -0800
Committer: David Yan <[email protected]>
Committed: Thu Mar 10 18:02:35 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 217 +++++++++----------
 .../datatorrent/stram/engine/GenericNode.java   |   5 +-
 .../com/datatorrent/stram/engine/InputNode.java |   4 +-
 .../stram/plan/logical/LogicalPlan.java         |  18 +-
 .../stram/plan/physical/PhysicalPlan.java       |   9 +
 .../stram/StreamingContainerManagerTest.java    |  84 +++++++
 6 files changed, 215 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index df3bfc4..1e76010 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -171,6 +171,7 @@ public class StreamingContainerManager implements 
PlanContext
   private RecoveryHandler recoveryHandler;
   // window id to node id to end window stats
   private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> 
endWindowStatsOperatorMap = new ConcurrentSkipListMap<Long, Map<Integer, 
EndWindowStats>>();
+  private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new 
ConcurrentHashMap<>();
   private long committedWindowId;
   // (operator id, port name) to timestamp
   private final Map<Pair<Integer, String>, Long> 
operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap();
@@ -230,7 +231,24 @@ public class StreamingContainerManager implements 
PlanContext
   public static class CriticalPathInfo
   {
     long latency;
-    LinkedList<Integer> path = new LinkedList<Integer>();
+    final LinkedList<Integer> path;
+
+    public CriticalPathInfo()
+    {
+      this.path = new LinkedList<>();
+    }
+
+    private CriticalPathInfo(long latency, LinkedList<Integer> path)
+    {
+      this.latency = latency;
+      this.path = path;
+    }
+
+    @Override
+    protected Object clone() throws CloneNotSupportedException
+    {
+      return new CriticalPathInfo(this.latency, 
(LinkedList<Integer>)this.path.clone());
+    }
   }
 
   private static class SetOperatorProperty implements Recoverable
@@ -766,6 +784,7 @@ public class StreamingContainerManager implements 
PlanContext
         Set<Integer> endWindowStatsOperators = endWindowStatsMap.keySet();
 
         aggregateMetrics(windowId, endWindowStatsMap);
+        criticalPathInfo = findCriticalPath();
 
         if (allCurrentOperators.containsAll(endWindowStatsOperators)) {
           if (endWindowStatsMap.size() < numOperators) {
@@ -778,22 +797,6 @@ public class StreamingContainerManager implements 
PlanContext
             }
           }
           else {
-            // collected data from all operators for this window id.  start 
latency calculation
-            List<OperatorMeta> rootOperatorMetas = 
plan.getLogicalPlan().getRootOperators();
-            Set<PTOperator> endWindowStatsVisited = new HashSet<PTOperator>();
-            Set<PTOperator> leafOperators = new HashSet<PTOperator>();
-            for (OperatorMeta root : rootOperatorMetas) {
-              List<PTOperator> rootOperators = plan.getOperators(root);
-              for (PTOperator rootOperator : rootOperators) {
-                // DFS for visiting the operators for latency calculation
-                LOG.debug("Calculating latency starting from operator {}", 
rootOperator.getId());
-                calculateLatency(rootOperator, endWindowStatsMap, 
endWindowStatsVisited, leafOperators);
-              }
-            }
-            CriticalPathInfo cpi = new CriticalPathInfo();
-            //LOG.debug("Finding critical path...");
-            cpi.latency = findCriticalPath(endWindowStatsMap, leafOperators, 
cpi.path);
-            criticalPathInfo = cpi;
             endWindowStatsOperatorMap.remove(windowId);
             currentEndWindowStatsWindowId = windowId;
           }
@@ -919,106 +922,41 @@ public class StreamingContainerManager implements 
PlanContext
     return logicalMetrics.get(operatorName);
   }
 
-  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> 
endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, Set<PTOperator> 
leafOperators)
+  private CriticalPathInfo findCriticalPath()
   {
-    endWindowStatsVisited.add(oper);
-    OperatorStatus operatorStatus = oper.stats;
-
-    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
-    if (endWindowStats == null) {
-      LOG.info("End window stats is null for operator {}, probably a new 
operator after partitioning", oper);
-      return;
-    }
-
-    // find the maximum end window emit time from all input ports
-    long upstreamMaxEmitTimestamp = -1;
-    PTOperator upstreamMaxEmitTimestampOperator = null;
-    for (PTOperator.PTInput input : oper.getInputs()) {
-      if (null != input.source.source) {
-        PTOperator upstreamOp = input.source.source;
-        EndWindowStats upstreamEndWindowStats = 
endWindowStatsMap.get(upstreamOp.getId());
-        if (upstreamEndWindowStats == null) {
-          LOG.info("End window stats is null for operator {}", oper);
-          return;
-        }
-        long adjustedEndWindowEmitTimestamp = 
upstreamEndWindowStats.emitTimestamp;
-        MovingAverageLong rpcLatency = 
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
-        if (rpcLatency != null) {
-          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
-        }
-        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
-          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
-          upstreamMaxEmitTimestampOperator = upstreamOp;
-        }
-      }
-    }
-
-    if (upstreamMaxEmitTimestamp > 0) {
-      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
-      MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
-      if (rpcLatency != null) {
-        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
-      }
-      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
-        LOG.debug("Adding {} to latency MA for {}", 
adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
-        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - 
upstreamMaxEmitTimestamp);
-      } else {
-        operatorStatus.latencyMA.add(0);
-        if (lastLatencyWarningTime < System.currentTimeMillis() - 
LATENCY_WARNING_THRESHOLD_MILLIS) {
-          LOG.warn("Latency calculation for this operator may not be correct 
because upstream end window timestamp is greater than this operator's end 
window timestamp: {} ({}) > {} ({}). Please verify that the system clocks are 
in sync in your cluster. You can also try tweaking the 
RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
-                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, 
adjustedEndWindowEmitTimestamp, oper, this.vars.rpcLatencyCompensationSamples);
-          lastLatencyWarningTime = System.currentTimeMillis();
-        }
-      }
-    }
-
-    if (oper.getOutputs().isEmpty()) {
-      // it is a leaf operator
-      leafOperators.add(oper);
-    }
-    else {
-      for (PTOperator.PTOutput output : oper.getOutputs()) {
-        for (PTOperator.PTInput input : output.sinks) {
-          if (input.target != null) {
-            PTOperator downStreamOp = input.target;
-            if (!endWindowStatsVisited.contains(downStreamOp)) {
-              calculateLatency(downStreamOp, endWindowStatsMap, 
endWindowStatsVisited, leafOperators);
-            }
-          }
-        }
+    CriticalPathInfo result = null;
+    List<PTOperator> leafOperators = plan.getLeafOperators();
+    Map<PTOperator, CriticalPathInfo> cache = new HashMap<>();
+    for (PTOperator leafOperator : leafOperators) {
+      CriticalPathInfo cpi = findCriticalPathHelper(leafOperator, cache);
+      if (result == null || result.latency < cpi.latency) {
+        result = cpi;
       }
     }
+    return result;
   }
-  /*
-   * returns cumulative latency
-   */
-  private long findCriticalPath(Map<Integer, EndWindowStats> 
endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
-  {
-    long maxEndWindowTimestamp = 0;
-    PTOperator maxOperator = null;
-    for (PTOperator operator : operators) {
-      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
-      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
-        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
-        maxOperator = operator;
-      }
-    }
-    if (maxOperator == null) {
-      return 0;
-    }
-    criticalPath.addFirst(maxOperator.getId());
-    OperatorStatus operatorStatus = maxOperator.stats;
 
-    operators.clear();
-    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
-      return operatorStatus.latencyMA.getAvg();
+  private CriticalPathInfo findCriticalPathHelper(PTOperator operator, 
Map<PTOperator, CriticalPathInfo> cache)
+  {
+    CriticalPathInfo cpi = cache.get(operator);
+    if (cpi != null) {
+      return cpi;
     }
-    for (PTOperator.PTInput input : maxOperator.getInputs()) {
-      if (null != input.source.source && !input.delay) {
-        operators.add(input.source.source);
+    PTOperator slowestUpstreamOperator = slowestUpstreamOp.get(operator);
+    if (slowestUpstreamOperator != null) {
+      cpi = findCriticalPathHelper(slowestUpstreamOperator, cache);
+      try {
+        cpi = (CriticalPathInfo)cpi.clone();
+      } catch (CloneNotSupportedException ex) {
+        throw new RuntimeException();
       }
+    } else {
+      cpi = new CriticalPathInfo();
     }
-    return operatorStatus.latencyMA.getAvg() + 
findCriticalPath(endWindowStatsMap, operators, criticalPath);
+    cpi.latency += operator.stats.getLatencyMA();
+    cpi.path.addLast(operator.getId());
+    cache.put(operator, cpi);
+    return cpi;
   }
 
   public int processEvents()
@@ -1338,7 +1276,7 @@ public class StreamingContainerManager implements 
PlanContext
     switch (oper.getState()) {
       case ACTIVE:
         // Commented out the warning below because it's expected when the 
operator does something
-        // quickly and goes out of commission, it will report SHUTDOWN 
correcly whereas this code
+        // quickly and goes out of commission, it will report SHUTDOWN 
correctly whereas this code
         // is incorrectly expecting ACTIVE to be reported.
         //LOG.warn("status out of sync {} expected {} remote {}", oper, 
oper.getState(), ds);
         // operator expected active, check remote status
@@ -1363,12 +1301,14 @@ public class StreamingContainerManager implements 
PlanContext
                 deactivatedOpers.add(oper);
               }
               sca.undeployOpers.add(oper.getId());
+              slowestUpstreamOp.remove(oper);
               // record operator stop event
               recordEventAsync(new 
StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), 
oper.getContainer().getExternalId()));
               break;
             case FAILED:
               processOperatorFailure(oper);
               sca.undeployOpers.add(oper.getId());
+              slowestUpstreamOp.remove(oper);
               recordEventAsync(new 
StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), 
oper.getContainer().getExternalId()));
               break;
             case ACTIVE:
@@ -1386,6 +1326,7 @@ public class StreamingContainerManager implements 
PlanContext
         else {
           // operator is currently deployed, request undeploy
           sca.undeployOpers.add(oper.getId());
+          slowestUpstreamOp.remove(oper);
         }
         break;
       case PENDING_DEPLOY:
@@ -1408,6 +1349,7 @@ public class StreamingContainerManager implements 
PlanContext
         if (ds != null) {
           // operator was removed and needs to be undeployed from container
           sca.undeployOpers.add(oper.getId());
+          slowestUpstreamOp.remove(oper);
           recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), 
oper.getId(), oper.getContainer().getExternalId()));
         }
     }
@@ -1606,8 +1548,11 @@ public class StreamingContainerManager implements 
PlanContext
               tuplesProcessed += s.tupleCount;
               endWindowStats.dequeueTimestamps.put(s.id, s.endWindowTimestamp);
 
-              Pair<Integer, String> operatorPortName = new Pair<Integer, 
String>(oper.getId(), s.id);
-              long lastEndWindowTimestamp = 
operatorPortLastEndWindowTimestamps.containsKey(operatorPortName) ? 
operatorPortLastEndWindowTimestamps.get(operatorPortName) : lastStatsTimestamp;
+              Pair<Integer, String> operatorPortName = new 
Pair<>(oper.getId(), s.id);
+              Long lastEndWindowTimestamp = 
operatorPortLastEndWindowTimestamps.get(operatorPortName);
+              if (lastEndWindowTimestamp == null) {
+                lastEndWindowTimestamp = lastStatsTimestamp;
+              }
               long portElapsedMillis = Math.max(s.endWindowTimestamp - 
lastEndWindowTimestamp, 0);
               //LOG.debug("=== PROCESSED TUPLE COUNT for {}: {}, {}, {}, {}", 
operatorPortName, s.tupleCount, portElapsedMillis, 
operatorPortLastEndWindowTimestamps.get(operatorPortName), lastStatsTimestamp);
               ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis);
@@ -1647,13 +1592,16 @@ public class StreamingContainerManager implements 
PlanContext
               ps.recordingId = s.recordingId;
 
               tuplesEmitted += s.tupleCount;
-              Pair<Integer, String> operatorPortName = new Pair<Integer, 
String>(oper.getId(), s.id);
-
-              long lastEndWindowTimestamp = 
operatorPortLastEndWindowTimestamps.containsKey(operatorPortName) ? 
operatorPortLastEndWindowTimestamps.get(operatorPortName) : lastStatsTimestamp;
+              Pair<Integer, String> operatorPortName = new 
Pair<>(oper.getId(), s.id);
+              Long lastEndWindowTimestamp = 
operatorPortLastEndWindowTimestamps.get(operatorPortName);
+              if (lastEndWindowTimestamp == null) {
+                lastEndWindowTimestamp = lastStatsTimestamp;
+              }
               long portElapsedMillis = Math.max(s.endWindowTimestamp - 
lastEndWindowTimestamp, 0);
               //LOG.debug("=== EMITTED TUPLE COUNT for {}: {}, {}, {}, {}", 
operatorPortName, s.tupleCount, portElapsedMillis, 
operatorPortLastEndWindowTimestamps.get(operatorPortName), lastStatsTimestamp);
               ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis);
               ps.bufferServerBytesPMSMA.add(s.bufferServerBytes, 
portElapsedMillis);
+
               operatorPortLastEndWindowTimestamps.put(operatorPortName, 
s.endWindowTimestamp);
               if (maxEndWindowTimestamp < s.endWindowTimestamp) {
                 maxEndWindowTimestamp = s.endWindowTimestamp;
@@ -1701,6 +1649,45 @@ public class StreamingContainerManager implements 
PlanContext
             }
             endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
 
+            if (!oper.getInputs().isEmpty()) {
+              long latency = Long.MAX_VALUE;
+              long adjustedEndWindowEmitTimestamp = 
endWindowStats.emitTimestamp;
+              MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
+              if (rpcLatency != null) {
+                adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
+              }
+              PTOperator slowestUpstream = null;
+              for (PTInput input : oper.getInputs()) {
+                PTOperator upstreamOp = input.source.source;
+                if (upstreamOp.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
+                  continue;
+                }
+                EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId());
+                long portLatency;
+                if (ews == null) {
+                  // This is when the operator is likely to be behind too many 
windows. We need to give an estimate for
+                  // latency at this point, by looking at the number of 
windows behind
+                  int widthMillis = 
plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
+                  portLatency = (upstreamOp.stats.currentWindowId.get() - 
oper.stats.currentWindowId.get()) * widthMillis;
+                } else {
+                  MovingAverageLong upstreamRPCLatency = 
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
+                  portLatency = adjustedEndWindowEmitTimestamp - 
ews.emitTimestamp;
+                  if (upstreamRPCLatency != null) {
+                    portLatency -= upstreamRPCLatency.getAvg();
+                  }
+                }
+                if (portLatency < 0) {
+                  portLatency = 0;
+                }
+                if (latency > portLatency) {
+                  latency = portLatency;
+                  slowestUpstream = upstreamOp;
+                }
+              }
+              status.latencyMA.add(latency);
+              slowestUpstreamOp.put(oper, slowestUpstream);
+            }
+
             Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
             int numOperators = plan.getAllOperators().size();
             if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && 
endWindowStatsMap.size() == numOperators) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 1ccec31..5acb15d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -138,13 +138,12 @@ public class GenericNode extends Node<Operator>
    */
   protected void processEndWindow(Tuple endWindowTuple)
   {
-    endWindowEmitTime = System.currentTimeMillis();
-
     if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
       insideWindow = false;
       operator.endWindow();
       applicationWindowCount = 0;
     }
+    endWindowEmitTime = System.currentTimeMillis();
 
     if (endWindowTuple == null) {
       emitEndWindow();
@@ -636,8 +635,8 @@ public class GenericNode extends Node<Operator>
      * TODO: as using a listener callback
      */
     if (insideWindow && !shutdown) {
-      endWindowEmitTime = System.currentTimeMillis();
       operator.endWindow();
+      endWindowEmitTime = System.currentTimeMillis();
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
         applicationWindowCount = 0;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index 92a61f0..0bd04ef 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -122,13 +122,13 @@ public class InputNode extends Node<InputOperator>
               break;
 
             case END_WINDOW:
-              endWindowEmitTime = System.currentTimeMillis();
               insideStreamingWindow = false;
               if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
                 insideApplicationWindow = false;
                 operator.endWindow();
                 applicationWindowCount = 0;
               }
+              endWindowEmitTime = System.currentTimeMillis();
 
               for (int i = sinks.length; i-- > 0;) {
                 sinks[i].put(t);
@@ -218,8 +218,8 @@ public class InputNode extends Node<InputOperator>
     }
 
     if (insideApplicationWindow) {
-      endWindowEmitTime = System.currentTimeMillis();
       operator.endWindow();
+      endWindowEmitTime = System.currentTimeMillis();
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
         applicationWindowCount = 0;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 6d7ebe1..173298b 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -158,6 +158,7 @@ public class LogicalPlan implements Serializable, DAG
   private final Map<String, OperatorMeta> operators = new HashMap<String, 
OperatorMeta>();
   public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
   private final List<OperatorMeta> rootOperators = new 
ArrayList<OperatorMeta>();
+  private final List<OperatorMeta> leafOperators = new ArrayList<>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
   private transient Map<String, ArrayListMultimap<OutputPort<?>, 
InputPort<?>>> streamLinks = new HashMap<>();
 
@@ -484,7 +485,9 @@ public class LogicalPlan implements Serializable, DAG
       sinks.add(portMeta);
       om.inputStreams.put(portMeta, this);
       rootOperators.remove(portMeta.operatorMeta);
-
+      if (this.source != null && !(this.source.getOperatorMeta().getOperator() 
instanceof Operator.DelayOperator)) {
+        leafOperators.remove(this.source.getOperatorMeta());
+      }
       return this;
     }
 
@@ -512,6 +515,10 @@ public class LogicalPlan implements Serializable, DAG
       this.sinks.clear();
       if (this.source != null) {
         this.source.getOperatorMeta().outputStreams.remove(this.source);
+        if (this.source.getOperatorMeta().outputStreams.isEmpty() &&
+            !(this.source.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator)) {
+          leafOperators.remove(this.source.getOperatorMeta());
+        }
       }
       this.source = null;
       streams.remove(this.id);
@@ -1142,6 +1149,7 @@ public class LogicalPlan implements Serializable, DAG
     }
     OperatorMeta decl = new OperatorMeta(name, operator);
     rootOperators.add(decl); // will be removed when a sink is added to an 
input port for this operator
+    leafOperators.add(decl); // will be removed when a sink is added to an 
output port for this operator
     operators.put(name, decl);
     return operator;
   }
@@ -1332,6 +1340,7 @@ public class LogicalPlan implements Serializable, DAG
     }
     this.operators.remove(om.getName());
     rootOperators.remove(om);
+    leafOperators.remove(om);
   }
 
   @Override
@@ -1517,6 +1526,11 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableList(this.rootOperators);
   }
 
+  public List<OperatorMeta> getLeafOperators()
+  {
+    return Collections.unmodifiableList(this.leafOperators);
+  }
+
   public Collection<OperatorMeta> getAllOperators()
   {
     return Collections.unmodifiableCollection(this.operators.values());
@@ -1923,7 +1937,7 @@ public class LogicalPlan implements Serializable, DAG
    * @see <a 
href="http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm";>http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm</a>
    *
    * @param om
-   * @param cycles
+   * @param ctx
    */
   public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index c696224..7d24633 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1356,6 +1356,15 @@ public class PhysicalPlan implements Serializable
     return this.logicalToPTOperator.get(logicalOperator).getAllOperators();
   }
 
+  public List<PTOperator> getLeafOperators()
+  {
+    List<PTOperator> operators = new ArrayList<>();
+    for (OperatorMeta opMeta : dag.getLeafOperators()) {
+      operators.addAll(getAllOperators(opMeta));
+    }
+    return operators;
+  }
+
   public boolean hasMapping(OperatorMeta om) {
     return this.logicalToPTOperator.containsKey(om);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e6d6b1a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 8fc957b..73c2ae0 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -77,6 +78,7 @@ import 
com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.datatorrent.stram.tuple.Tuple;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
 
 import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONObject;
@@ -1010,4 +1012,86 @@ public class StreamingContainerManagerTest
     String msg = TestMetricTransport.messages.get(0);
     Assert.assertTrue(msg.startsWith("xyz:"));
   }
+
+  public static class HighLatencyTestOperator extends GenericTestOperator
+  {
+    private long firstWindowMillis;
+    private long windowWidthMillis;
+    private long currentWindowId;
+    private long latency;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      firstWindowMillis = System.currentTimeMillis();
+      // this is an approximation because there is no way to get to the actual 
value in the DAG
+
+      windowWidthMillis = 
context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      currentWindowId = windowId;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      long sleepMillis = latency -
+          (System.currentTimeMillis() - 
WindowGenerator.getWindowMillis(currentWindowId, firstWindowMillis, 
windowWidthMillis));
+
+      if (sleepMillis > 0) {
+        try {
+          Thread.sleep(sleepMillis);
+        } catch (InterruptedException ex) {
+          // move on
+        }
+      }
+    }
+
+    public void setLatency(long latency)
+    {
+      this.latency = latency;
+    }
+
+  }
+
+  @Test
+  public void testLatency() throws Exception
+  {
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    HighLatencyTestOperator o3 = dag.addOperator("o3", 
HighLatencyTestOperator.class);
+    GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
+    long latency = 5000; // 5 seconds
+    o3.setLatency(latency);
+    dag.addStream("o1.outport", o1.outport, o2.inport1, o3.inport1);
+    dag.addStream("o2.outport1", o2.outport1, o4.inport1);
+    dag.addStream("o3.outport1", o3.outport1, o4.inport2);
+    dag.setAttribute(Context.DAGContext.STATS_MAX_ALLOWABLE_WINDOWS_LAG, 2); 
// 1 second
+    StramLocalCluster lc = new StramLocalCluster(dag);
+    StreamingContainerManager dnmgr = lc.dnmgr;
+    lc.runAsync();
+    Thread.sleep(10000);
+    LogicalOperatorInfo o1Info = dnmgr.getLogicalOperatorInfo("o1");
+    LogicalOperatorInfo o2Info = dnmgr.getLogicalOperatorInfo("o2");
+    LogicalOperatorInfo o3Info = dnmgr.getLogicalOperatorInfo("o3");
+    LogicalOperatorInfo o4Info = dnmgr.getLogicalOperatorInfo("o4");
+
+    Assert.assertEquals("Input operator latency must be zero", 0, 
o1Info.latencyMA);
+    Assert.assertTrue("Latency must be greater than or equal to zero", 
o2Info.latencyMA >= 0);
+    Assert.assertTrue("Actual latency must be greater than the artificially 
introduced latency",
+        o3Info.latencyMA > latency);
+    Assert.assertTrue("Latency must be greater than or equal to zero", 
o4Info.latencyMA >= 0);
+    StreamingContainerManager.CriticalPathInfo criticalPathInfo = 
dnmgr.getCriticalPathInfo();
+    Assert.assertArrayEquals("Critical Path must be the path in the DAG that 
includes the HighLatencyTestOperator",
+        new Integer[]{o1Info.partitions.iterator().next(),
+            o3Info.partitions.iterator().next(),
+            o4Info.partitions.iterator().next()},
+        criticalPathInfo.path.toArray());
+    Assert.assertTrue("Whole DAG latency must be greater than the artificially 
introduced latency",
+        criticalPathInfo.latency > latency);
+    lc.shutdown();
+  }
 }

Reply via email to