Repository: beam Updated Branches: refs/heads/master 884935cb9 -> fdf2de999
[BEAM-2096] Make DataflowMetrics more resilient DataflowMetrics seems to have many hard-coded assumptions about what will be returned by the Dataflow service that, which will likely break when users use new types of metrics or if the Dataflow service makes minor adjustments in our it sends metrics back to the user. In order for code to continue working in these cases, handle errors by logging and skipping. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e047b69e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e047b69e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e047b69e Branch: refs/heads/master Commit: e047b69efab9c988011303cf2eda86ac408b38c2 Parents: 884935c Author: Dan Halperin <[email protected]> Authored: Thu Apr 27 08:30:47 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Apr 27 13:10:22 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowMetrics.java | 38 ++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e047b69e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 7633a56..aa80959 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -136,22 +136,32 @@ class DataflowMetrics extends MetricResults { ImmutableList.builder(); ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder(); for (MetricKey metricKey : metricHashKeys) { - String metricName = metricKey.metricName().name(); - if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") - || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { - // Skip distribution metrics, as these are not yet properly supported. - LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow" - + "User Interface"); + if (!MetricFiltering.matches(filter, metricKey)) { + // Skip unmatched metrics early. continue; } - String namespace = metricKey.metricName().namespace(); - String step = metricKey.stepName(); - Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue(); - Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue(); - if (MetricFiltering.matches(filter, metricKey)) { - counterResults.add(DataflowMetricResult.create( - MetricName.named(namespace, metricName), - step, committed, attempted)); + + // This code is not robust to evolutions in the types of metrics that can be returned, so + // wrap it in a try-catch and log errors. + try { + String metricName = metricKey.metricName().name(); + if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") + || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { + // Skip distribution metrics, as these are not yet properly supported. + LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow" + + "User Interface"); + continue; + } + + String namespace = metricKey.metricName().namespace(); + String step = metricKey.stepName(); + Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue(); + Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue(); + counterResults.add( + DataflowMetricResult.create( + MetricName.named(namespace, metricName), step, committed, attempted)); + } catch (Exception e) { + LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter); } } return DataflowMetricQueryResults.create(
