Repository: beam Updated Branches: refs/heads/master 1d9772a3a -> 59aa0dab7
Support for querying metrics in Dataflow Runner Added MetricsFiltering class for helper methods related to matching step names. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6de412a5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6de412a5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6de412a5 Branch: refs/heads/master Commit: 6de412a5dfb3000ab5d354ada8761789230d3ce3 Parents: 1d9772a Author: Pablo <[email protected]> Authored: Fri Mar 10 16:10:31 2017 -0800 Committer: bchambers <[email protected]> Committed: Mon Mar 20 14:48:22 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectMetrics.java | 70 +----- .../beam/runners/direct/DirectMetricsTest.java | 86 ++----- .../beam/runners/dataflow/DataflowMetrics.java | 212 +++++++++++++++++ .../runners/dataflow/DataflowPipelineJob.java | 14 +- .../runners/dataflow/DataflowMetricsTest.java | 236 +++++++++++++++++++ .../beam/sdk/metrics/MetricFiltering.java | 99 ++++++++ .../beam/sdk/metrics/MetricFilteringTest.java | 72 ++++++ 7 files changed, 655 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index fa8f9c3..f04dc21 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -20,12 +20,10 @@ package org.apache.beam.runners.direct; import static java.util.Arrays.asList; import com.google.auto.value.AutoValue; -import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -35,9 +33,9 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; @@ -258,7 +256,7 @@ class DirectMetrics extends MetricResults { MetricsFilter filter, ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder, Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) { - if (matches(filter, entry.getKey())) { + if (MetricFiltering.matches(filter, entry.getKey())) { resultsBuilder.add(DirectMetricResult.create( entry.getKey().metricName(), entry.getKey().stepName(), @@ -267,70 +265,6 @@ class DirectMetrics extends MetricResults { } } - // Matching logic is implemented here rather than in MetricsFilter because we would like - // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with - // a Proto/JSON/etc. schema object. - private boolean matches(MetricsFilter filter, MetricKey key) { - return matchesName(key.metricName(), filter.names()) - && matchesScope(key.stepName(), filter.steps()); - } - - /** - * {@code subPathMatches(haystack, needle)} returns true if {@code needle} - * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b", - * but not "a/fool/bar/b" or "a/foo/bart/b". - */ - public boolean subPathMatches(String haystack, String needle) { - int location = haystack.indexOf(needle); - int end = location + needle.length(); - if (location == -1) { - return false; // needle not found - } else if (location != 0 && haystack.charAt(location - 1) != '/') { - return false; // the first entry in needle wasn't exactly matched - } else if (end != haystack.length() && haystack.charAt(end) != '/') { - return false; // the last entry in needle wasn't exactly matched - } else { - return true; - } - } - - /** - * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched - * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A - * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or - * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */ - public boolean matchesScope(String actualScope, Set<String> scopes) { - if (scopes.isEmpty() || scopes.contains(actualScope)) { - return true; - } - - // If there is no perfect match, a stage name-level match is tried. - // This is done by a substring search over the levels of the scope. - // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C". - for (String scope : scopes) { - if (subPathMatches(actualScope, scope)) { - return true; - } - } - - return false; - } - - private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) { - if (nameFilters.isEmpty()) { - return true; - } - - for (MetricNameFilter nameFilter : nameFilters) { - if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) - && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { - return true; - } - } - - return false; - } - /** Apply metric updates that represent physical counter deltas to the current metric values. */ public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) { for (MetricUpdate<Long> counter : updates.counterUpdates()) { http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 77229bf..7183124 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -23,13 +23,9 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; -import java.util.HashSet; -import java.util.Set; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; @@ -129,97 +125,63 @@ public class DirectMetricsTest { committedMetricsResult("ns1", "name1", "step2", 0L))); } - private boolean matchesSubPath(String actualScope, String subPath) { - return metrics.subPathMatches(actualScope, subPath); - } - - @Test - public void testMatchesSubPath() { - assertTrue("Match of the first element", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1")); - assertTrue("Match of the first elements", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); - assertTrue("Match of the last elements", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1")); - assertFalse("Substring match but no subpath match", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1")); - assertFalse("Substring match from start - but no subpath match", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top")); - } - - private boolean matchesScopeWithSingleFilter(String actualScope, String filter) { - Set<String> scopeFilter = new HashSet<String>(); - scopeFilter.add(filter); - return metrics.matchesScope(actualScope, scopeFilter); - } - - @Test - public void testMatchesScope() { - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1")); - assertTrue(matchesScopeWithSingleFilter( - "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1")); - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1")); - assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1")); - assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn")); - } - @SuppressWarnings("unchecked") @Test - public void testPartialScopeMatchingInMetricsQuery() { + public void testApplyAttemptedQueryCompositeScope() { metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( - MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L), - MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)), + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), ImmutableList.<MetricUpdate<DistributionData>>of())); metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( - MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L), - MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)), + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L), + MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), ImmutableList.<MetricUpdate<DistributionData>>of())); MetricQueryResults results = metrics.queryMetrics( - MetricsFilter.builder().addStep("Top1/Outer1").build()); + MetricsFilter.builder().addStep("Outer1").build()); assertThat(results.counters(), containsInAnyOrder( - attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L), - attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L))); - - results = metrics.queryMetrics( - MetricsFilter.builder().addStep("Inner2").build()); + attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L), + attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L))); assertThat(results.counters(), containsInAnyOrder( - attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L), - attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L))); + committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L), + committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L))); } + @SuppressWarnings("unchecked") @Test - public void testApplyAttemptedQueryCompositeScope() { + public void testPartialScopeMatchingInMetricsQuery() { metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( - MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), - MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), + MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)), ImmutableList.<MetricUpdate<DistributionData>>of())); metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( - MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L), - MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), + MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L), + MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)), ImmutableList.<MetricUpdate<DistributionData>>of())); MetricQueryResults results = metrics.queryMetrics( - MetricsFilter.builder().addStep("Outer1").build()); + MetricsFilter.builder().addStep("Top1/Outer1").build()); assertThat(results.counters(), containsInAnyOrder( - attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L), - attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L))); + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L), + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L))); + + results = metrics.queryMetrics( + MetricsFilter.builder().addStep("Inner2").build()); assertThat(results.counters(), containsInAnyOrder( - committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L), - committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L))); + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L), + attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java new file mode 100644 index 0000000..c0d1883 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.auto.value.AutoValue; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricFiltering; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link MetricResults} for the Dataflow Runner. + */ +class DataflowMetrics extends MetricResults { + private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class); + /** + * Client for the Dataflow service. This can be used to query the service + * for information about the job. + */ + private DataflowClient dataflowClient; + + /** + * PipelineResult implementation for Dataflow Runner. It contains job state and id information. + */ + private DataflowPipelineJob dataflowPipelineJob; + + /** + * After the job has finished running, Metrics no longer will change, so their results are + * cached here. + */ + private MetricQueryResults cachedMetricResults = null; + + /** + * Constructor for the DataflowMetrics class. + * @param dataflowPipelineJob is used to get Job state and Job ID information. + * @param dataflowClient is used to query user metrics from the Dataflow service. + */ + public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient dataflowClient) { + this.dataflowClient = dataflowClient; + this.dataflowPipelineJob = dataflowPipelineJob; + } + + /** + * Build an immutable map that serves as a hash key for a metric update. + * @return a {@link MetricKey} that can be hashed and used to identify a metric. + */ + private MetricKey metricHashKey( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + String fullStepName = metricUpdate.getName().getContext().get("step"); + fullStepName = (dataflowPipelineJob.aggregatorTransforms != null + ? dataflowPipelineJob.aggregatorTransforms + .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName); + return MetricKey.create( + fullStepName, + MetricName.named( + metricUpdate.getName().getContext().get("namespace"), + metricUpdate.getName().getName())); + } + + /** + * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative + * update or not. + * @return true if update is tentative, false otherwise + */ + private boolean isMetricTentative( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + return (metricUpdate.getName().getContext().containsKey("tentative") + && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true")); + } + + /** + * Take a list of metric updates coming from the Dataflow service, and format it into a + * Metrics API MetricQueryResults instance. + * @param metricUpdates + * @return a populated MetricQueryResults object. + */ + private MetricQueryResults populateMetricQueryResults( + List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates, + MetricsFilter filter) { + // Separate metric updates by name and by tentative/committed. + HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate> + tentativeByName = new HashMap<>(); + HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate> + committedByName = new HashMap<>(); + HashSet<MetricKey> metricHashKeys = new HashSet<>(); + + // If the Context of the metric update does not have a namespace, then these are not + // actual metrics counters. + for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) { + if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update) + && update.getName().getContext().containsKey("namespace")) { + tentativeByName.put(metricHashKey(update), update); + metricHashKeys.add(metricHashKey(update)); + } else if (Objects.equal(update.getName().getOrigin(), "user") + && update.getName().getContext().containsKey("namespace") + && !isMetricTentative(update)) { + committedByName.put(metricHashKey(update), update); + metricHashKeys.add(metricHashKey(update)); + } + } + // Create the lists with the metric result information. + ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); + ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults = + ImmutableList.builder(); + for (MetricKey metricKey : metricHashKeys) { + String metricName = metricKey.metricName().name(); + if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") + || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { + // Skip distribution metrics, as these are not yet properly supported. + LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow" + + "User Interface"); + continue; + } + String namespace = metricKey.metricName().namespace(); + String step = metricKey.stepName(); + Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue(); + Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue(); + if (MetricFiltering.matches(filter, metricKey)) { + counterResults.add(DataflowMetricResult.create( + MetricName.named(namespace, metricName), + step, committed, attempted)); + } + } + return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build()); + } + + private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { + List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates; + ImmutableList<MetricResult<Long>> counters = ImmutableList.of(); + ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of(); + JobMetrics jobMetrics; + try { + jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId); + } catch (IOException e) { + LOG.warn("Unable to query job metrics.\n"); + return DataflowMetricQueryResults.create(counters, distributions); + } + metricUpdates = jobMetrics.getMetrics(); + return populateMetricQueryResults(metricUpdates, filter); + } + + public MetricQueryResults queryMetrics() { + return queryMetrics(null); + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + if (cachedMetricResults != null) { + // Metric results have been cached after the job ran. + return cachedMetricResults; + } + MetricQueryResults result = queryServiceForMetrics(filter); + if (dataflowPipelineJob.getState().isTerminal()) { + // Add current query result to the cache. + cachedMetricResults = result; + } + return result; + } + + @AutoValue + abstract static class DataflowMetricQueryResults implements MetricQueryResults { + public static MetricQueryResults create( + Iterable<MetricResult<Long>> counters, + Iterable<MetricResult<DistributionResult>> distributions) { + return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions); + } + } + + @AutoValue + abstract static class DataflowMetricResult<T> implements MetricResult<T> { + // need to define these here so they appear in the correct order + // and the generated constructor is usable and consistent + public abstract MetricName name(); + public abstract String step(); + public abstract T committed(); + public abstract T attempted(); + + public static <T> MetricResult<T> create(MetricName name, String scope, + T committed, T attempted) { + return new AutoValue_DataflowMetrics_DataflowMetricResult<T>( + name, scope, committed, attempted); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 950a9d3..1112fbb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -57,7 +57,7 @@ public class DataflowPipelineJob implements PipelineResult { /** * The id for the job. */ - private String jobId; + protected String jobId; /** * The {@link DataflowPipelineOptions} for the job. @@ -71,6 +71,12 @@ public class DataflowPipelineJob implements PipelineResult { private final DataflowClient dataflowClient; /** + * MetricResults object for Dataflow Runner. It allows for querying of metrics from the Dataflow + * service. + */ + private final DataflowMetrics dataflowMetrics; + + /** * The state the job terminated in or {@code null} if the job has not terminated. */ @Nullable @@ -82,7 +88,7 @@ public class DataflowPipelineJob implements PipelineResult { @Nullable private DataflowPipelineJob replacedByJob = null; - private DataflowAggregatorTransforms aggregatorTransforms; + protected DataflowAggregatorTransforms aggregatorTransforms; /** * The Metric Updates retrieved after the job was in a terminal state. @@ -129,6 +135,7 @@ public class DataflowPipelineJob implements PipelineResult { this.dataflowOptions = dataflowOptions; this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions)); this.aggregatorTransforms = aggregatorTransforms; + this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient); } /** @@ -462,8 +469,7 @@ public class DataflowPipelineJob implements PipelineResult { @Override public MetricResults metrics() { - throw new UnsupportedOperationException( - "The DataflowRunner does not currently support metrics."); + return dataflowMetrics; } private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator) http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java new file mode 100644 index 0000000..1017978 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.TestCredential; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link DataflowMetrics}. + */ +@RunWith(JUnit4.class) +public class DataflowMetricsTest { + private static final String PROJECT_ID = "some-project"; + private static final String JOB_ID = "1234"; + private static final String REGION_ID = "some-region"; + private static final String REPLACEMENT_JOB_ID = "4321"; + + @Mock + private Dataflow mockWorkflowClient; + @Mock + private Dataflow.Projects mockProjects; + @Mock + private Dataflow.Projects.Locations mockLocations; + @Mock + private Dataflow.Projects.Locations.Jobs mockJobs; + + private TestDataflowPipelineOptions options; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + when(mockWorkflowClient.projects()).thenReturn(mockProjects); + when(mockProjects.locations()).thenReturn(mockLocations); + when(mockLocations.jobs()).thenReturn(mockJobs); + + options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setDataflowClient(mockWorkflowClient); + options.setProject(PROJECT_ID); + options.setRunner(DataflowRunner.class); + options.setTempLocation("gs://fakebucket/temp"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setGcpCredential(new TestCredential()); + } + + @Test + public void testEmptyMetricUpdates() throws IOException { + Job modelJob = new Job(); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of()); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(); + assertThat(ImmutableList.copyOf(result.counters()), is(empty())); + assertThat(ImmutableList.copyOf(result.distributions()), is(empty())); + } + + @Test + public void testCachingMetricUpdates() throws IOException { + Job modelJob = new Job(); + modelJob.setCurrentState(State.RUNNING.toString()); + + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.DONE); + job.jobId = JOB_ID; + + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of()); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + verify(dataflowClient, times(0)).getJobMetrics(JOB_ID); + dataflowMetrics.queryMetrics(null); + verify(dataflowClient, times(1)).getJobMetrics(JOB_ID); + dataflowMetrics.queryMetrics(null); + verify(dataflowClient, times(1)).getJobMetrics(JOB_ID); + } + + private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step, + long scalar, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.setScalar(new BigDecimal(scalar)); + + MetricStructuredName structuredName = new MetricStructuredName(); + structuredName.setName(name); + structuredName.setOrigin("user"); + ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<String, String>(); + contextBuilder.put("step", step) + .put("namespace", namespace); + if (tentative) { + contextBuilder.put("tentative", "true"); + } + structuredName.setContext(contextBuilder.build()); + update.setName(structuredName); + return update; + } + + @Test + public void testSingleCounterUpdates() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + MetricUpdate update = new MetricUpdate(); + long stepValue = 1234L; + update.setScalar(new BigDecimal(stepValue)); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + MetricUpdate mu1 = makeCounterMetricUpdate("counterName", "counterNamespace", + "s2", 1234L, false); + MetricUpdate mu1Tentative = makeCounterMetricUpdate("counterName", + "counterNamespace", "s2", 1233L, true); + jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.counters(), containsInAnyOrder( + attemptedMetricsResult("counterNamespace", "counterName", "s2", 1233L))); + assertThat(result.counters(), containsInAnyOrder( + committedMetricsResult("counterNamespace", "counterName", "s2", 1234L))); + } + + @Test + public void testIgnoreDistributionButGetCounterUpdates() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false), + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true), + makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, false), + makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.counters(), containsInAnyOrder( + attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L))); + assertThat(result.counters(), containsInAnyOrder( + committedMetricsResult("counterNamespace", "counterName", "s2", 1233L))); + } + + @Test + public void testMultipleCounterUpdates() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false), + makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true), + makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false), + makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true), + makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false), + makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.counters(), containsInAnyOrder( + attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L), + attemptedMetricsResult("otherNamespace", "otherCounter", "s3", 12L), + attemptedMetricsResult("otherNamespace", "counterName", "s4", 1233L))); + assertThat(result.counters(), containsInAnyOrder( + committedMetricsResult("counterNamespace", "counterName", "s2", 1233L), + committedMetricsResult("otherNamespace", "otherCounter", "s3", 12L), + committedMetricsResult("otherNamespace", "counterName", "s4", 1200L))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java new file mode 100644 index 0000000..a3e43e1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import com.google.common.base.Objects; +import java.util.Set; + +/** + * Implements matching for metrics filters. Specifically, matching for metric name, + * namespace, and step name. + */ +public class MetricFiltering { + + private MetricFiltering() { } + + /** Matching logic is implemented here rather than in MetricsFilter because we would like + * MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with + * a Proto/JSON/etc. schema object. + * @param filter {@link MetricsFilter} with the matching information of an actual metric + * @param key {@link MetricKey} with the information of a metric + * @return whether the filter matches the key or not + */ + public static boolean matches(MetricsFilter filter, MetricKey key) { + return filter == null + || (matchesName(key.metricName(), filter.names()) + && matchesScope(key.stepName(), filter.steps())); + } + + /** + * {@code subPathMatches(haystack, needle)} returns true if {@code needle} + * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b", + * but not "a/fool/bar/b" or "a/foo/bart/b". + */ + public static boolean subPathMatches(String haystack, String needle) { + int location = haystack.indexOf(needle); + int end = location + needle.length(); + if (location == -1) { + return false; // needle not found + } else if (location != 0 && haystack.charAt(location - 1) != '/') { + return false; // the first entry in needle wasn't exactly matched + } else if (end != haystack.length() && haystack.charAt(end) != '/') { + return false; // the last entry in needle wasn't exactly matched + } else { + return true; + } + } + + /** + * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched + * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A + * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or + * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */ + public static boolean matchesScope(String actualScope, Set<String> scopes) { + if (scopes.isEmpty() || scopes.contains(actualScope)) { + return true; + } + + // If there is no perfect match, a stage name-level match is tried. + // This is done by a substring search over the levels of the scope. + // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C". + for (String scope : scopes) { + if (subPathMatches(actualScope, scope)) { + return true; + } + } + + return false; + } + + private static boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) { + if (nameFilters.isEmpty()) { + return true; + } + for (MetricNameFilter nameFilter : nameFilters) { + if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) + && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java new file mode 100644 index 0000000..3e6a499 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricFiltering}. + */ +@RunWith(JUnit4.class) +public class MetricFilteringTest { + private static final MetricName NAME1 = MetricName.named("ns1", "name1"); + + + private boolean matchesSubPath(String actualScope, String subPath) { + return MetricFiltering.subPathMatches(actualScope, subPath); + } + + @Test + public void testMatchesSubPath() { + assertTrue("Match of the first element", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1")); + assertTrue("Match of the first elements", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); + assertTrue("Match of the last elements", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1")); + assertFalse("Substring match but no subpath match", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1")); + assertFalse("Substring match from start - but no subpath match", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top")); + } + + private boolean matchesScopeWithSingleFilter(String actualScope, String filter) { + Set<String> scopeFilter = new HashSet<String>(); + scopeFilter.add(filter); + return MetricFiltering.matchesScope(actualScope, scopeFilter); + } + + @Test + public void testMatchesScope() { + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1")); + assertTrue(matchesScopeWithSingleFilter( + "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1")); + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1")); + assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1")); + assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn")); + } +}
