Repository: beam Updated Branches: refs/heads/master 860ac1d1f -> 6ea4b3a42
[BEAM-1334] Split UsesMetrics category and tests into UsesCommittedMetrics and UsesAttemptedMetrics Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6e5c341b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6e5c341b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6e5c341b Branch: refs/heads/master Commit: 6e5c341b18e512616f51562c5c76992a0923ff42 Parents: 860ac1d Author: Aviem Zur <[email protected]> Authored: Sat Jan 28 17:08:58 2017 +0200 Committer: bchambers <[email protected]> Committed: Wed Feb 1 16:57:57 2017 -0800 ---------------------------------------------------------------------- runners/apex/pom.xml | 3 +- .../beam/runners/direct/DirectMetricsTest.java | 58 +++++++++---- runners/flink/runner/pom.xml | 6 +- runners/google-cloud-dataflow-java/pom.xml | 3 +- runners/spark/pom.xml | 3 +- .../beam/sdk/testing/UsesAttemptedMetrics.java | 28 ++++++ .../beam/sdk/testing/UsesCommittedMetrics.java | 28 ++++++ .../apache/beam/sdk/testing/UsesMetrics.java | 24 ------ .../apache/beam/sdk/metrics/MetricMatchers.java | 90 ++++++++++++++------ .../apache/beam/sdk/metrics/MetricsTest.java | 75 +++++++++++----- 10 files changed, 224 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 5e16083..002bf9a 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -188,7 +188,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesMetrics + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index df01244..3ad2bdc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.direct; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -63,8 +64,9 @@ public class DirectMetricsTest { MockitoAnnotations.initMocks(this); } + @SuppressWarnings("unchecked") @Test - public void testApplyLogicalQueryNoFilter() { + public void testApplyCommittedNoFilter() { metrics.commitLogical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), @@ -82,17 +84,22 @@ public class DirectMetricsTest { MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build()); assertThat(results.counters(), containsInAnyOrder( - metricResult("ns1", "name1", "step1", 5L, 0L), - metricResult("ns1", "name2", "step1", 12L, 0L), - metricResult("ns1", "name1", "step2", 7L, 0L))); + attemptedMetricsResult("ns1", "name1", "step1", 0L), + attemptedMetricsResult("ns1", "name2", "step1", 0L), + attemptedMetricsResult("ns1", "name1", "step2", 0L))); + assertThat(results.counters(), containsInAnyOrder( + committedMetricsResult("ns1", "name1", "step1", 5L), + committedMetricsResult("ns1", "name2", "step1", 12L), + committedMetricsResult("ns1", "name1", "step2", 7L))); + assertThat(results.distributions(), contains( + attemptedMetricsResult("ns1", "name1", "step1", DistributionResult.ZERO))); assertThat(results.distributions(), contains( - metricResult("ns1", "name1", "step1", - DistributionResult.create(12, 3, 3, 5), - DistributionResult.ZERO))); + committedMetricsResult("ns1", "name1", "step1", DistributionResult.create(12, 3, 3, 5)))); } + @SuppressWarnings("unchecked") @Test - public void testApplyPhysicalCountersQueryOneNamespace() { + public void testApplyAttemptedCountersQueryOneNamespace() { metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), @@ -104,15 +111,23 @@ public class DirectMetricsTest { MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), ImmutableList.<MetricUpdate<DistributionData>>of())); - assertThat(metrics.queryMetrics( - MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(), + MetricQueryResults results = metrics.queryMetrics( + MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()); + + assertThat(results.counters(), + containsInAnyOrder( + attemptedMetricsResult("ns1", "name1", "step1", 5L), + attemptedMetricsResult("ns1", "name1", "step2", 7L))); + + assertThat(results.counters(), containsInAnyOrder( - metricResult("ns1", "name1", "step1", 0L, 5L), - metricResult("ns1", "name1", "step2", 0L, 7L))); + committedMetricsResult("ns1", "name1", "step1", 0L), + committedMetricsResult("ns1", "name1", "step2", 0L))); } + @SuppressWarnings("unchecked") @Test - public void testApplyPhysicalQueryCompositeScope() { + public void testApplyAttemptedQueryCompositeScope() { metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), @@ -124,10 +139,17 @@ public class DirectMetricsTest { MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), ImmutableList.<MetricUpdate<DistributionData>>of())); - assertThat(metrics.queryMetrics( - MetricsFilter.builder().addStep("Outer1").build()).counters(), + MetricQueryResults results = metrics.queryMetrics( + MetricsFilter.builder().addStep("Outer1").build()); + + assertThat(results.counters(), + containsInAnyOrder( + attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L), + attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L))); + + assertThat(results.counters(), containsInAnyOrder( - metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L), - metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L))); + committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L), + committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 6a7cbff..fe058b5 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -58,7 +58,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesMetrics + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> @@ -90,7 +91,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesMetrics + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index f17eb78..35f31b9 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -78,7 +78,8 @@ <id>runnable-on-service-tests</id> <configuration> <excludedGroups> - org.apache.beam.sdk.testing.UsesMetrics, + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesUnboundedPCollections, http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 5d46f8d..ffe6dfd 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,7 +77,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesMetrics + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <forkCount>1</forkCount> <reuseForks>false</reuseForks> http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java new file mode 100644 index 0000000..0a17980 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.metrics.MetricResult; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}. + * Tests tagged with {@link UsesAttemptedMetrics} should be run for runners which support + * {@link MetricResult#attempted()}. + */ +public class UsesAttemptedMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java new file mode 100644 index 0000000..c22f397 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.metrics.MetricResult; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}. + * Tests tagged with {@link UsesCommittedMetrics} should be run for runners which support + * {@link MetricResult#committed()}. + */ +public interface UsesCommittedMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java deleted file mode 100644 index 261354c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.testing; - -/** - * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}. - */ -public interface UsesMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 798d9d4..b8109b3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -29,7 +29,7 @@ import org.hamcrest.TypeSafeMatcher; */ public class MetricMatchers { - public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) { + static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) { return new TypeSafeMatcher<MetricUpdate<T>>() { @Override protected boolean matchesSafely(MetricUpdate<T> item) { @@ -47,7 +47,7 @@ public class MetricMatchers { }; } - public static <T> Matcher<MetricUpdate<T>> metricUpdate( + static <T> Matcher<MetricUpdate<T>> metricUpdate( final String namespace, final String name, final String step, final T update) { return new TypeSafeMatcher<MetricUpdate<T>>() { @Override @@ -70,16 +70,14 @@ public class MetricMatchers { }; } - public static <T> Matcher<MetricResult<T>> metricResult( - final String namespace, final String name, final String step, - final T committed, final T attempted) { + public static <T> Matcher<MetricResult<T>> attemptedMetricsResult( + final String namespace, final String name, final String step, final T attempted) { return new TypeSafeMatcher<MetricResult<T>>() { @Override protected boolean matchesSafely(MetricResult<T> item) { return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && Objects.equals(committed, item.committed()) && Objects.equals(attempted, item.attempted()); } @@ -89,7 +87,6 @@ public class MetricMatchers { .appendText("MetricResult{inNamespace=").appendValue(namespace) .appendText(", name=").appendValue(name) .appendText(", step=").appendValue(step) - .appendText(", committed=").appendValue(committed) .appendText(", attempted=").appendValue(attempted) .appendText("}"); } @@ -97,38 +94,81 @@ public class MetricMatchers { @Override protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { mismatchDescription.appendText("MetricResult{"); - if (!Objects.equals(namespace, item.name().namespace())) { - mismatchDescription - .appendText("inNamespace: ").appendValue(namespace) - .appendText(" != ").appendValue(item.name().namespace()); - } - if (!Objects.equals(name, item.name().name())) { - mismatchDescription - .appendText("name: ").appendValue(name) - .appendText(" != ").appendValue(item.name().name()); - } + describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - if (!item.step().contains(step)) { + if (!Objects.equals(attempted, item.attempted())) { mismatchDescription - .appendText("step: ").appendValue(step) - .appendText(" != ").appendValue(item.step()); + .appendText("attempted: ").appendValue(attempted) + .appendText(" != ").appendValue(item.attempted()); } + mismatchDescription.appendText("}"); + } + }; + } + + public static <T> Matcher<MetricResult<T>> committedMetricsResult( + final String namespace, final String name, final String step, + final T committed) { + return new TypeSafeMatcher<MetricResult<T>>() { + @Override + protected boolean matchesSafely(MetricResult<T> item) { + return Objects.equals(namespace, item.name().namespace()) + && Objects.equals(name, item.name().name()) + && item.step().contains(step) + && Objects.equals(committed, item.committed()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricResult{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", committed=").appendValue(committed) + .appendText("}"); + } + + @Override + protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { + mismatchDescription.appendText("MetricResult{"); + + describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); + if (!Objects.equals(committed, item.committed())) { mismatchDescription .appendText("committed: ").appendValue(committed) .appendText(" != ").appendValue(item.committed()); } - if (!Objects.equals(attempted, item.attempted())) { - mismatchDescription - .appendText("attempted: ").appendValue(attempted) - .appendText(" != ").appendValue(item.attempted()); - } mismatchDescription.appendText("}"); } }; } + private static <T> void describeMetricsResultMembersMismatch( + MetricResult<T> item, + Description mismatchDescription, + String namespace, + String name, + String step) { + if (!Objects.equals(namespace, item.name().namespace())) { + mismatchDescription + .appendText("inNamespace: ").appendValue(namespace) + .appendText(" != ").appendValue(item.name().namespace()); + } + + if (!Objects.equals(name, item.name().name())) { + mismatchDescription + .appendText("name: ").appendValue(name) + .appendText(" != ").appendValue(item.name().name()); + } + + if (!item.step().contains(step)) { + mismatchDescription + .appendText("step: ").appendValue(step) + .appendText(" != ").appendValue(item.step()); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 075df19..9ad0935 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -18,7 +18,8 @@ package org.apache.beam.sdk.metrics; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; @@ -29,7 +30,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.UsesMetrics; +import org.apache.beam.sdk.testing.UsesAttemptedMetrics; +import org.apache.beam.sdk.testing.UsesCommittedMetrics; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -108,14 +110,59 @@ public class MetricsTest implements Serializable { assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); } - @Category({RunnableOnService.class, UsesMetrics.class}) + @Category({RunnableOnService.class, UsesCommittedMetrics.class}) @Test - public void metricsReportToQuery() { + public void committedMetricsReportToQuery() { + PipelineResult result = runPipelineWithMetrics(); + + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + + assertThat(metrics.counters(), hasItem( + committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L))); + assertThat(metrics.distributions(), hasItem( + committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1", + DistributionResult.create(26L, 3L, 5L, 13L)))); + + assertThat(metrics.counters(), hasItem( + committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L))); + assertThat(metrics.distributions(), hasItem( + committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2", + DistributionResult.create(52L, 6L, 5L, 13L)))); + } + + + @Category({RunnableOnService.class, UsesAttemptedMetrics.class}) + @Test + public void attemptedMetricsReportToQuery() { + PipelineResult result = runPipelineWithMetrics(); + + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + + // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. + assertThat(metrics.counters(), hasItem( + attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L))); + assertThat(metrics.distributions(), hasItem( + attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1", + DistributionResult.create(26L, 3L, 5L, 13L)))); + + assertThat(metrics.counters(), hasItem( + attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L))); + assertThat(metrics.distributions(), hasItem( + attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2", + DistributionResult.create(52L, 6L, 5L, 13L)))); + } + + private PipelineResult runPipelineWithMetrics() { final Counter count = Metrics.counter(MetricsTest.class, "count"); Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of(5, 8, 13)) .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() { + @SuppressWarnings("unused") @ProcessElement public void processElement(ProcessContext c) { Distribution values = Metrics.distribution(MetricsTest.class, "input"); @@ -127,6 +174,7 @@ public class MetricsTest implements Serializable { } })) .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() { + @SuppressWarnings("unused") @ProcessElement public void processElement(ProcessContext c) { Distribution values = Metrics.distribution(MetricsTest.class, "input"); @@ -137,23 +185,6 @@ public class MetricsTest implements Serializable { PipelineResult result = pipeline.run(); result.waitUntilFinish(); - - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. - assertThat(metrics.counters(), hasItem( - metricResult(MetricsTest.class.getName(), "count", "MyStep1", 3L, 3L))); - assertThat(metrics.distributions(), hasItem( - metricResult(MetricsTest.class.getName(), "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L), - DistributionResult.create(26L, 3L, 5L, 13L)))); - - assertThat(metrics.counters(), hasItem( - metricResult(MetricsTest.class.getName(), "count", "MyStep2", 6L, 6L))); - assertThat(metrics.distributions(), hasItem( - metricResult(MetricsTest.class.getName(), "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L), - DistributionResult.create(52L, 6L, 5L, 13L)))); + return result; } }
