Repository: beam Updated Branches: refs/heads/master de9d89c1e -> f23dd6709
Make it possible to test runners that don't support all metrics Runners may only partially support metrics. This partial support runs along axes: - attempted & committed - counters & distributions & gauges & ... Prior to this PR, we have categories for the first axis, but not the second. This means that a runner that supports only part of the second axis has to blacklist tests on the first axis. Adding categories for both axes lets runners build a matrix of supported features. (This also lets the DataflowRunner run more tests by accurately identifying its test matrix.) * MetricsMatchers: refactor matchers for committed/attempted code reuse Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4faa8feb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4faa8feb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4faa8feb Branch: refs/heads/master Commit: 4faa8feba822db000b4b42636408245422ed324d Parents: de9d89c Author: Dan Halperin <[email protected]> Authored: Fri Apr 21 10:03:50 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Apr 26 13:48:42 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 4 +- .../beam/sdk/testing/UsesCounterMetrics.java | 25 +++ .../sdk/testing/UsesDistributionMetrics.java | 26 ++++ .../beam/sdk/testing/UsesGaugeMetrics.java | 25 +++ .../apache/beam/sdk/metrics/MetricMatchers.java | 144 ++++++----------- .../apache/beam/sdk/metrics/MetricsTest.java | 154 ++++++++++++------- 6 files changed, 225 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 64dc71e..75aac43 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -122,8 +122,8 @@ <id>validates-runner-tests</id> <configuration> <excludedGroups> - org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesDistributionMetrics, + org.apache.beam.sdk.testing.UsesGaugeMetrics, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, org.apache.beam.sdk.testing.UsesTimersInParDo, http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java new file mode 100644 index 0000000..0d0ed6f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Counter}. + * Tests tagged with {@link UsesCounterMetrics} should be run for runners which support counters. + */ +public class UsesCounterMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java new file mode 100644 index 0000000..6422024 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Distribution}. + * Tests tagged with {@link UsesDistributionMetrics} should be run for runners which support + * distributions. + */ +public class UsesDistributionMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java new file mode 100644 index 0000000..9d6455e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Gauge}. + * Tests tagged with {@link UsesGaugeMetrics} should be run for runners which support gauges. + */ +public class UsesGaugeMetrics {} http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 2251c82..a0dd119 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -81,61 +81,39 @@ public class MetricMatchers { } /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose attempted - * value equals the given value. + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for attempted metrics. */ public static <T> Matcher<MetricResult<T>> attemptedMetricsResult( - final String namespace, final String name, final String step, final T attempted) { - return new TypeSafeMatcher<MetricResult<T>>() { - @Override - protected boolean matchesSafely(MetricResult<T> item) { - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && metricResultsEqual(attempted, item.attempted()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(", attempted=").appendValue(attempted) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - - if (!Objects.equals(attempted, item.attempted())) { - mismatchDescription - .appendText("attempted: ").appendValue(attempted) - .appendText(" != ").appendValue(item.attempted()); - } - - mismatchDescription.appendText("}"); - } - }; + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, false); } /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose committed - * value equals the given value. + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for committed metrics. */ public static <T> Matcher<MetricResult<T>> committedMetricsResult( - final String namespace, final String name, final String step, - final T committed) { + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, true); + } + + /** + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for either committed or attempted (based on {@code isCommitted}) metrics. + */ + public static <T> Matcher<MetricResult<T>> metricsResult( + final String namespace, final String name, final String step, final T value, + final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; return new TypeSafeMatcher<MetricResult<T>>() { @Override protected boolean matchesSafely(MetricResult<T> item) { + final T metricValue = isCommitted ? item.committed() : item.attempted(); return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && metricResultsEqual(committed, item.committed()); + && metricResultsEqual(value, metricValue); } @Override @@ -144,20 +122,21 @@ public class MetricMatchers { .appendText("MetricResult{inNamespace=").appendValue(namespace) .appendText(", name=").appendValue(name) .appendText(", step=").appendValue(step) - .appendText(", committed=").appendValue(committed) + .appendText(String.format(", %s=", metricState)).appendValue(value) .appendText("}"); } @Override protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { mismatchDescription.appendText("MetricResult{"); + final T metricValue = isCommitted ? item.committed() : item.attempted(); describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - if (!Objects.equals(committed, item.committed())) { + if (!Objects.equals(value, metricValue)) { mismatchDescription - .appendText("committed: ").appendValue(committed) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%s: ", metricState)).appendValue(value) + .appendText(" != ").appendValue(metricValue); } mismatchDescription.appendText("}"); @@ -176,62 +155,28 @@ public class MetricMatchers { static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax( final String namespace, final String name, final String step, final Long attemptedMin, final Long attemptedMax) { - return new TypeSafeMatcher<MetricResult<DistributionResult>>() { - @Override - protected boolean matchesSafely(MetricResult<DistributionResult> item) { - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && Objects.equals(attemptedMin, item.attempted().min()) - && Objects.equals(attemptedMax, item.attempted().max()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(", attemptedMin=").appendValue(attemptedMin) - .appendText(", attemptedMax=").appendValue(attemptedMax) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult<DistributionResult> item, - Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - - if (!Objects.equals(attemptedMin, item.attempted())) { - mismatchDescription - .appendText("attemptedMin: ").appendValue(attemptedMin) - .appendText(" != ").appendValue(item.attempted()); - } - - if (!Objects.equals(attemptedMax, item.attempted())) { - mismatchDescription - .appendText("attemptedMax: ").appendValue(attemptedMax) - .appendText(" != ").appendValue(item.attempted()); - } - - mismatchDescription.appendText("}"); - } - }; + return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false); } static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax( final String namespace, final String name, final String step, final Long committedMin, final Long committedMax) { + return distributionMinMax(namespace, name, step, committedMin, committedMax, true); + } + + static Matcher<MetricResult<DistributionResult>> distributionMinMax( + final String namespace, final String name, final String step, + final Long min, final Long max, final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; return new TypeSafeMatcher<MetricResult<DistributionResult>>() { @Override protected boolean matchesSafely(MetricResult<DistributionResult> item) { + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && Objects.equals(committedMin, item.committed().min()) - && Objects.equals(committedMax, item.committed().max()); + && Objects.equals(min, metricValue.min()) + && Objects.equals(max, metricValue.max()); } @Override @@ -240,8 +185,8 @@ public class MetricMatchers { .appendText("MetricResult{inNamespace=").appendValue(namespace) .appendText(", name=").appendValue(name) .appendText(", step=").appendValue(step) - .appendText(", committedMin=").appendValue(committedMin) - .appendText(", committedMax=").appendValue(committedMax) + .appendText(String.format(", %sMin=", metricState)).appendValue(min) + .appendText(String.format(", %sMax=", metricState)).appendValue(max) .appendText("}"); } @@ -251,17 +196,18 @@ public class MetricMatchers { mismatchDescription.appendText("MetricResult{"); describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); - if (!Objects.equals(committedMin, item.committed())) { + if (!Objects.equals(min, metricValue.min())) { mismatchDescription - .appendText("committedMin: ").appendValue(committedMin) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%sMin: ", metricState)).appendValue(min) + .appendText(" != ").appendValue(metricValue.min()); } - if (!Objects.equals(committedMax, item.committed())) { + if (!Objects.equals(max, metricValue.max())) { mismatchDescription - .appendText("committedMax: ").appendValue(committedMax) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%sMax: ", metricState)).appendValue(max) + .appendText(" != ").appendValue(metricValue.max()); } mismatchDescription.appendText("}"); http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index c7068e1..8077c27 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.metrics; import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.distributionAttemptedMinMax; -import static org.apache.beam.sdk.metrics.MetricMatchers.distributionCommittedMinMax; +import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; @@ -33,6 +32,9 @@ import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesAttemptedMetrics; import org.apache.beam.sdk.testing.UsesCommittedMetrics; +import org.apache.beam.sdk.testing.UsesCounterMetrics; +import org.apache.beam.sdk.testing.UsesDistributionMetrics; +import org.apache.beam.sdk.testing.UsesGaugeMetrics; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -58,6 +60,13 @@ public class MetricsTest implements Serializable { private static final String NAMESPACE = MetricsTest.class.getName(); private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); + private static MetricQueryResults queryTestMetrics(PipelineResult result) { + return result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + } + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -67,14 +76,14 @@ public class MetricsTest implements Serializable { } @Test - public void distributionWithoutContainer() { + public void testDistributionWithoutContainer() { assertNull(MetricsEnvironment.getCurrentContainer()); // Should not fail even though there is no metrics container. Metrics.distribution(NS, NAME).update(5L); } @Test - public void counterWithoutContainer() { + public void testCounterWithoutContainer() { assertNull(MetricsEnvironment.getCurrentContainer()); // Should not fail even though there is no metrics container. Counter counter = Metrics.counter(NS, NAME); @@ -85,7 +94,7 @@ public class MetricsTest implements Serializable { } @Test - public void distributionToCell() { + public void testDistributionToCell() { MetricsContainer container = new MetricsContainer("step"); MetricsEnvironment.setCurrentContainer(container); @@ -104,7 +113,7 @@ public class MetricsTest implements Serializable { } @Test - public void counterToCell() { + public void testCounterToCell() { MetricsContainer container = new MetricsContainer("step"); MetricsEnvironment.setCurrentContainer(container); Counter counter = Metrics.counter(NS, NAME); @@ -122,62 +131,73 @@ public class MetricsTest implements Serializable { assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); } - @Category({ValidatesRunner.class, UsesCommittedMetrics.class}) + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class, + UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test - public void committedMetricsReportToQuery() { + public void testAllCommittedMetrics() { PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - - assertThat(metrics.counters(), hasItem( - committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); - assertThat(metrics.distributions(), hasItem( - committedMetricsResult(NAMESPACE, "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L)))); + assertAllMetrics(metrics, true); + } - assertThat(metrics.counters(), hasItem( - committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); - assertThat(metrics.distributions(), hasItem( - committedMetricsResult(NAMESPACE, "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L)))); - assertThat(metrics.gauges(), hasItem( - committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", - GaugeResult.create(12L, Instant.now())))); + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class, + UsesDistributionMetrics.class, UsesGaugeMetrics.class}) + @Test + public void testAllAttemptedMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); - assertThat(metrics.distributions(), hasItem( - distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); + // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. + assertAllMetrics(metrics, false); } + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class}) + @Test + public void testCommittedCounterMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertCounterMetrics(metrics, true); + } - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) @Test - public void attemptedMetricsReportToQuery() { + public void testAttemptedCounterMetrics() { PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertCounterMetrics(metrics, false); + } - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesDistributionMetrics.class}) + @Test + public void testCommittedDistributionMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertDistributionMetrics(metrics, true); + } - // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. - assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); - assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(NAMESPACE, "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L)))); + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesDistributionMetrics.class}) + @Test + public void testAttemptedDistributionMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertDistributionMetrics(metrics, false); + } - assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); - assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(NAMESPACE, "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L)))); - assertThat(metrics.gauges(), hasItem( - attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", - GaugeResult.create(12L, Instant.now())))); + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesGaugeMetrics.class}) + @Test + public void testCommittedGaugeMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertGaugeMetrics(metrics, true); + } - assertThat(metrics.distributions(), hasItem( - distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesGaugeMetrics.class}) + @Test + public void testAttemptedGaugeMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertGaugeMetrics(metrics, false); } private PipelineResult runPipelineWithMetrics() { @@ -232,8 +252,39 @@ public class MetricsTest implements Serializable { return result; } + private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertThat(metrics.counters(), hasItem( + metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted))); + assertThat(metrics.counters(), hasItem( + metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted))); + } + + private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertThat(metrics.gauges(), hasItem( + metricsResult(NAMESPACE, "my-gauge", "MyStep2", + GaugeResult.create(12L, Instant.now()), isCommitted))); + } + + private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertThat(metrics.distributions(), hasItem( + metricsResult(NAMESPACE, "input", "MyStep1", + DistributionResult.create(26L, 3L, 5L, 13L), isCommitted))); + + assertThat(metrics.distributions(), hasItem( + metricsResult(NAMESPACE, "input", "MyStep2", + DistributionResult.create(52L, 6L, 5L, 13L), isCommitted))); + assertThat(metrics.distributions(), hasItem( + distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted))); + } + + private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertCounterMetrics(metrics, isCommitted); + assertDistributionMetrics(metrics, isCommitted); + assertGaugeMetrics(metrics, isCommitted); + } + @Test - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { long numElements = 1000; @@ -259,11 +310,10 @@ public class MetricsTest implements Serializable { } @Test - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testUnboundedSourceMetrics() { long numElements = 1000; - // Use withMaxReadTime to force unbounded mode. pipeline.apply( GenerateSequence.from(0).to(numElements).withMaxReadTime(Duration.standardDays(1)));
