[
https://issues.apache.org/jira/browse/APEXCORE-201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114001#comment-15114001
]
ASF GitHub Bot commented on APEXCORE-201:
-----------------------------------------
Github user davidyan74 commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/194#discussion_r50626827
--- Diff:
engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
@@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
return logicalMetrics.get(operatorName);
}
- private void calculateLatency(PTOperator oper, Map<Integer,
EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited,
Set<PTOperator> leafOperators)
+ private CriticalPathInfo findCriticalPath()
{
- endWindowStatsVisited.add(oper);
- OperatorStatus operatorStatus = oper.stats;
-
- EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
- if (endWindowStats == null) {
- LOG.info("End window stats is null for operator {}, probably a new
operator after partitioning", oper);
- return;
- }
-
- // find the maximum end window emit time from all input ports
- long upstreamMaxEmitTimestamp = -1;
- PTOperator upstreamMaxEmitTimestampOperator = null;
- for (PTOperator.PTInput input : oper.getInputs()) {
- if (null != input.source.source) {
- PTOperator upstreamOp = input.source.source;
- EndWindowStats upstreamEndWindowStats =
endWindowStatsMap.get(upstreamOp.getId());
- if (upstreamEndWindowStats == null) {
- LOG.info("End window stats is null for operator {}", oper);
- return;
- }
- long adjustedEndWindowEmitTimestamp =
upstreamEndWindowStats.emitTimestamp;
- MovingAverageLong rpcLatency =
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
- if (rpcLatency != null) {
- adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
- }
- if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
- upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
- upstreamMaxEmitTimestampOperator = upstreamOp;
- }
- }
- }
-
- if (upstreamMaxEmitTimestamp > 0) {
- long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
- MovingAverageLong rpcLatency =
rpcLatencies.get(oper.getContainer().getExternalId());
- if (rpcLatency != null) {
- adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
- }
- if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
- LOG.debug("Adding {} to latency MA for {}",
adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
- operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp -
upstreamMaxEmitTimestamp);
- } else {
- operatorStatus.latencyMA.add(0);
- if (lastLatencyWarningTime < System.currentTimeMillis() -
LATENCY_WARNING_THRESHOLD_MILLIS) {
- LOG.warn("Latency calculation for this operator may not be
correct because upstream end window timestamp is greater than this operator's
end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks
are in sync in your cluster. You can also try tweaking the
RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
- upstreamMaxEmitTimestamp,
upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper,
this.vars.rpcLatencyCompensationSamples);
- lastLatencyWarningTime = System.currentTimeMillis();
- }
- }
- }
-
- if (oper.getOutputs().isEmpty()) {
- // it is a leaf operator
- leafOperators.add(oper);
- }
- else {
- for (PTOperator.PTOutput output : oper.getOutputs()) {
- for (PTOperator.PTInput input : output.sinks) {
- if (input.target != null) {
- PTOperator downStreamOp = input.target;
- if (!endWindowStatsVisited.contains(downStreamOp)) {
- calculateLatency(downStreamOp, endWindowStatsMap,
endWindowStatsVisited, leafOperators);
- }
- }
- }
+ CriticalPathInfo result = null;
+ List<PTOperator> leafOperators = plan.getLeafOperators();
+ for (PTOperator leafOperator : leafOperators) {
+ CriticalPathInfo cpi = new CriticalPathInfo();
+ findCriticalPathHelper(leafOperator, cpi);
+ if (result == null || result.latency < cpi.latency) {
+ result = cpi;
}
}
+ return result;
}
- /*
- * returns cumulative latency
- */
- private long findCriticalPath(Map<Integer, EndWindowStats>
endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
- {
- long maxEndWindowTimestamp = 0;
- PTOperator maxOperator = null;
- for (PTOperator operator : operators) {
- EndWindowStats endWindowStats =
endWindowStatsMap.get(operator.getId());
- if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
- maxEndWindowTimestamp = endWindowStats.emitTimestamp;
- maxOperator = operator;
- }
- }
- if (maxOperator == null) {
- return 0;
- }
- criticalPath.addFirst(maxOperator.getId());
- OperatorStatus operatorStatus = maxOperator.stats;
- operators.clear();
- if (maxOperator.getInputs() == null ||
maxOperator.getInputs().isEmpty()) {
- return operatorStatus.latencyMA.getAvg();
- }
- for (PTOperator.PTInput input : maxOperator.getInputs()) {
- if (null != input.source.source) {
- operators.add(input.source.source);
- }
+ private void findCriticalPathHelper(PTOperator operator,
CriticalPathInfo cpi)
+ {
+ cpi.latency += operator.stats.getLatencyMA();
--- End diff --
@gauravgopi123 Yes, you are correct. We actually need to change this code
anyway because the iteration feature has been merged. Will do that when I come
back.
> Reported latency is wrong when a downstream operator is behind more than 1000
> windows
> -------------------------------------------------------------------------------------
>
> Key: APEXCORE-201
> URL: https://issues.apache.org/jira/browse/APEXCORE-201
> Project: Apache Apex Core
> Issue Type: Bug
> Reporter: David Yan
> Assignee: David Yan
>
> We should probably estimate this by reporting the latency using the number of
> windows behind when that happens. Right now it reports a stale latency.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)