[ 
https://issues.apache.org/jira/browse/APEXCORE-201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114001#comment-15114001
 ] 

ASF GitHub Bot commented on APEXCORE-201:
-----------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50626827
  
    --- Diff: 
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, 
EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, 
Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new 
operator after partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = 
endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = 
upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = 
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", 
adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - 
upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - 
LATENCY_WARNING_THRESHOLD_MILLIS) {
    -          LOG.warn("Latency calculation for this operator may not be 
correct because upstream end window timestamp is greater than this operator's 
end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks 
are in sync in your cluster. You can also try tweaking the 
RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
    -                  upstreamMaxEmitTimestamp, 
upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper, 
this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, 
endWindowStatsVisited, leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = new CriticalPathInfo();
    +      findCriticalPathHelper(leafOperator, cpi);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> 
endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = 
endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || 
maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    -    }
    -    for (PTOperator.PTInput input : maxOperator.getInputs()) {
    -      if (null != input.source.source) {
    -        operators.add(input.source.source);
    -      }
    +  private void findCriticalPathHelper(PTOperator operator, 
CriticalPathInfo cpi)
    +  {
    +    cpi.latency += operator.stats.getLatencyMA();
    --- End diff --
    
    @gauravgopi123 Yes, you are correct.  We actually need to change this code 
anyway because the iteration feature has been merged.  Will do that when I come 
back.


> Reported latency is wrong when a downstream operator is behind more than 1000 
> windows
> -------------------------------------------------------------------------------------
>
>                 Key: APEXCORE-201
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-201
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: David Yan
>            Assignee: David Yan
>
> We should probably estimate this by reporting the latency using the number of 
> windows behind when that happens.  Right now it reports a stale latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to