[ 
https://issues.apache.org/jira/browse/FLINK-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15604780#comment-15604780
 ] 

ASF GitHub Bot commented on FLINK-4733:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2616#discussion_r84697528
  
    --- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 ---
    @@ -99,11 +83,34 @@ public String handleRequest(ExecutionJobVertex 
jobVertex, Map<String, String> pa
                        gen.writeNumberField("end-time", endTime);
                        gen.writeNumberField("duration", duration);
     
    +                   IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
    +
    +                   long numBytesIn = 0;
    +                   long numBytesOut = 0;
    +                   long numRecordsIn = 0;
    +                   long numRecordsOut = 0;
    +
    +                   if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
    +                           numBytesIn = ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
    +                           numBytesOut = ioMetrics.getNumBytesOut();
    +                           numRecordsIn = ioMetrics.getNumRecordsIn();
    +                           numRecordsOut = ioMetrics.getNumRecordsOut();
    +                   } else { // execAttempt is still running, use 
MetricQueryService instead
    +                           fetcher.update();
    +                           MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(vertex.getJobId().toString(), 
vertex.getJobvertexId().toString(), vertex.getParallelSubtaskIndex());
    +                           if (metrics != null) {
    +                                   numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
    +                                   numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
    +                                   numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
    +                                   numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
    --- End diff --
    
    The metric names are used in many places. I think we should use constants 
for them.


> Port WebFrontend to new metric system
> -------------------------------------
>
>                 Key: FLINK-4733
>                 URL: https://issues.apache.org/jira/browse/FLINK-4733
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics, TaskManager, Webfrontend
>    Affects Versions: 1.1.2
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: 1.2.0
>
>
> While the WebFrontend has access to the metric system it still relies on 
> older code in some parts.
> The TaskManager metrics are still gathered using the Codahale library and 
> send with the heartbeats.
> Task related metrics (numRecordsIn etc) are still gathered using 
> accumulators, which are accessed through the execution graph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to