[
https://issues.apache.org/jira/browse/BEAM-5246?focusedWorklogId=143724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143724
]
ASF GitHub Bot logged work on BEAM-5246:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Sep/18 19:46
Start Date: 12/Sep/18 19:46
Worklog Time Spent: 10m
Work Description: asfgit closed pull request #6319: [BEAM-5246] Metric
container should forward only metrics of the stepName
URL: https://github.com/apache/beam/pull/6319
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index f5e3c34ca2d..024fc769645 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -90,7 +90,7 @@ public void finishBundle() {
}
// update metrics
- container.updateMetrics();
+ container.updateMetrics(stepName);
}
@Override
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 384ffaa7850..73fe62e219f 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -83,10 +83,10 @@ MetricsContainer getMetricsContainer(String stepName) {
: null;
}
- void updateMetrics() {
+ void updateMetrics(String stepName) {
MetricResults metricResults =
asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
MetricQueryResults metricQueryResults =
- metricResults.queryMetrics(MetricsFilter.builder().build());
+
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
updateCounters(metricQueryResults.getCounters());
updateDistributions(metricQueryResults.getDistributions());
updateGauge(metricQueryResults.getGauges());
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
index 2a56afc533f..995baccb17e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
@@ -49,7 +49,7 @@ public boolean invokeStart(ReaderT reader) throws IOException
{
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
{
boolean result = reader.start();
- container.updateMetrics();
+ container.updateMetrics(stepName);
return result;
}
} else {
@@ -62,7 +62,7 @@ public boolean invokeAdvance(ReaderT reader) throws
IOException {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
{
boolean result = reader.advance();
- container.updateMetrics();
+ container.updateMetrics(stepName);
return result;
}
} else {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 143724)
Time Spent: 1h 10m (was: 1h)
> Beam metrics exported as flink metrics are not correct
> ------------------------------------------------------
>
> Key: BEAM-5246
> URL: https://issues.apache.org/jira/browse/BEAM-5246
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.6.0
> Reporter: Jozef Vilcek
> Assignee: Jozef Vilcek
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> In Flink UI and fink native MetricReported, I am seeing too many instances of
> my Beam metric counter. It looks like the counter is materialised for every
> operator running within the task, although is is emitter from only one beam
> step (which should map to one operator?). This produces double counting.
> A bit debugging I noticed this is happening for stream jobs. In batch I was
> not able to reproduce it. Problem might be in FlinkMetricContainer.
> [https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L86]
> The update seems to be called from operators after finishing the bundle. Data
> from accumulator are flushed to `runtimeContext.getMetricGroup()`. The scope
> of accumulator seems to be different than metricGroup as in there with
> different call the scope components change, especially for operatorID. It
> seems like during the run, `metricResult.getStep()` does not match
> operatorName of metricGroup where metric is being pushed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)