Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/230#discussion_r54943874
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -919,106 +914,41 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, 
EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, 
Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new 
operator after partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = 
endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = 
upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = 
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", 
adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - 
upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - 
LATENCY_WARNING_THRESHOLD_MILLIS) {
    -          LOG.warn("Latency calculation for this operator may not be 
correct because upstream end window timestamp is greater than this operator's 
end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks 
are in sync in your cluster. You can also try tweaking the 
RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
    -                  upstreamMaxEmitTimestamp, 
upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper, 
this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, 
endWindowStatsVisited, leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    Map<PTOperator, CriticalPathInfo> cache = new HashMap<>();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = findCriticalPathHelper(leafOperator, cache);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> 
endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = 
endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || 
maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    +  private CriticalPathInfo findCriticalPathHelper(PTOperator operator, 
Map<PTOperator, CriticalPathInfo> cache)
    +  {
    +    if (cache.containsKey(operator)) {
    --- End diff --
    
    Avoid double hash map lookup? Combine containsKey() and get()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to