Repository: beam Updated Branches: refs/heads/release-2.0.0 e08cac055 -> 7485583a3
http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java deleted file mode 100644 index a0dd119..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import java.util.Objects; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; - -/** - * Matchers for metrics. - */ -public class MetricMatchers { - - /** - * Matches a {@link MetricUpdate} with the given name and contents. - * - * <p>Visible since it may be used in runner-specific tests. - */ - public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) { - return new TypeSafeMatcher<MetricUpdate<T>>() { - @Override - protected boolean matchesSafely(MetricUpdate<T> item) { - return Objects.equals(name, item.getKey().metricName().name()) - && Objects.equals(update, item.getUpdate()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricUpdate{name=").appendValue(name) - .appendText(", update=").appendValue(update) - .appendText("}"); - } - }; - } - - /** - * Matches a {@link MetricUpdate} with the given namespace, name, step and contents. - * - * <p>Visible since it may be used in runner-specific tests. - */ - public static <T> Matcher<MetricUpdate<T>> metricUpdate( - final String namespace, final String name, final String step, final T update) { - return new TypeSafeMatcher<MetricUpdate<T>>() { - @Override - protected boolean matchesSafely(MetricUpdate<T> item) { - return Objects.equals(namespace, item.getKey().metricName().namespace()) - && Objects.equals(name, item.getKey().metricName().name()) - && Objects.equals(step, item.getKey().stepName()) - && Objects.equals(update, item.getUpdate()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricUpdate{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(", update=").appendValue(update) - .appendText("}"); - } - }; - } - - /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals - * the given value for attempted metrics. - */ - public static <T> Matcher<MetricResult<T>> attemptedMetricsResult( - final String namespace, final String name, final String step, final T value) { - return metricsResult(namespace, name, step, value, false); - } - - /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals - * the given value for committed metrics. - */ - public static <T> Matcher<MetricResult<T>> committedMetricsResult( - final String namespace, final String name, final String step, final T value) { - return metricsResult(namespace, name, step, value, true); - } - - /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals - * the given value for either committed or attempted (based on {@code isCommitted}) metrics. - */ - public static <T> Matcher<MetricResult<T>> metricsResult( - final String namespace, final String name, final String step, final T value, - final boolean isCommitted) { - final String metricState = isCommitted ? "committed" : "attempted"; - return new TypeSafeMatcher<MetricResult<T>>() { - @Override - protected boolean matchesSafely(MetricResult<T> item) { - final T metricValue = isCommitted ? item.committed() : item.attempted(); - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && metricResultsEqual(value, metricValue); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(String.format(", %s=", metricState)).appendValue(value) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - final T metricValue = isCommitted ? item.committed() : item.attempted(); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - - if (!Objects.equals(value, metricValue)) { - mismatchDescription - .appendText(String.format("%s: ", metricState)).appendValue(value) - .appendText(" != ").appendValue(metricValue); - } - - mismatchDescription.appendText("}"); - } - }; - } - - private static <T> boolean metricResultsEqual(T result1, T result2) { - if (result1 instanceof GaugeResult) { - return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value()); - } else { - return result1.equals(result2); - } - } - - static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax( - final String namespace, final String name, final String step, - final Long attemptedMin, final Long attemptedMax) { - return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false); - } - - static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax( - final String namespace, final String name, final String step, - final Long committedMin, final Long committedMax) { - return distributionMinMax(namespace, name, step, committedMin, committedMax, true); - } - - static Matcher<MetricResult<DistributionResult>> distributionMinMax( - final String namespace, final String name, final String step, - final Long min, final Long max, final boolean isCommitted) { - final String metricState = isCommitted ? "committed" : "attempted"; - return new TypeSafeMatcher<MetricResult<DistributionResult>>() { - @Override - protected boolean matchesSafely(MetricResult<DistributionResult> item) { - DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && Objects.equals(min, metricValue.min()) - && Objects.equals(max, metricValue.max()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(String.format(", %sMin=", metricState)).appendValue(min) - .appendText(String.format(", %sMax=", metricState)).appendValue(max) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult<DistributionResult> item, - Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); - - if (!Objects.equals(min, metricValue.min())) { - mismatchDescription - .appendText(String.format("%sMin: ", metricState)).appendValue(min) - .appendText(" != ").appendValue(metricValue.min()); - } - - if (!Objects.equals(max, metricValue.max())) { - mismatchDescription - .appendText(String.format("%sMax: ", metricState)).appendValue(max) - .appendText(" != ").appendValue(metricValue.max()); - } - - mismatchDescription.appendText("}"); - } - }; - } - - private static <T> void describeMetricsResultMembersMismatch( - MetricResult<T> item, - Description mismatchDescription, - String namespace, - String name, - String step) { - if (!Objects.equals(namespace, item.name().namespace())) { - mismatchDescription - .appendText("inNamespace: ").appendValue(namespace) - .appendText(" != ").appendValue(item.name().namespace()); - } - - if (!Objects.equals(name, item.name().name())) { - mismatchDescription - .appendText("name: ").appendValue(name) - .appendText(" != ").appendValue(item.name().name()); - } - - if (!item.step().contains(step)) { - mismatchDescription - .appendText("step: ").appendValue(step) - .appendText(" != ").appendValue(item.step()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java new file mode 100644 index 0000000..5031952 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import java.util.Objects; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matchers for {@link MetricResults}. + */ +public class MetricResultsMatchers { + + /** + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for attempted metrics. + */ + public static <T> Matcher<MetricResult<T>> attemptedMetricsResult( + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, false); + } + + /** + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for committed metrics. + */ + public static <T> Matcher<MetricResult<T>> committedMetricsResult( + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, true); + } + + /** + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for either committed or attempted (based on {@code isCommitted}) metrics. + */ + public static <T> Matcher<MetricResult<T>> metricsResult( + final String namespace, final String name, final String step, final T value, + final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; + return new TypeSafeMatcher<MetricResult<T>>() { + @Override + protected boolean matchesSafely(MetricResult<T> item) { + final T metricValue = isCommitted ? item.committed() : item.attempted(); + return Objects.equals(namespace, item.name().namespace()) + && Objects.equals(name, item.name().name()) + && item.step().contains(step) + && metricResultsEqual(value, metricValue); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricResult{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(String.format(", %s=", metricState)).appendValue(value) + .appendText("}"); + } + + @Override + protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) { + mismatchDescription.appendText("MetricResult{"); + final T metricValue = isCommitted ? item.committed() : item.attempted(); + + describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); + + if (!Objects.equals(value, metricValue)) { + mismatchDescription + .appendText(String.format("%s: ", metricState)).appendValue(value) + .appendText(" != ").appendValue(metricValue); + } + + mismatchDescription.appendText("}"); + } + }; + } + + private static <T> boolean metricResultsEqual(T result1, T result2) { + if (result1 instanceof GaugeResult) { + return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value()); + } else { + return result1.equals(result2); + } + } + + static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax( + final String namespace, final String name, final String step, + final Long attemptedMin, final Long attemptedMax) { + return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false); + } + + static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax( + final String namespace, final String name, final String step, + final Long committedMin, final Long committedMax) { + return distributionMinMax(namespace, name, step, committedMin, committedMax, true); + } + + public static Matcher<MetricResult<DistributionResult>> distributionMinMax( + final String namespace, final String name, final String step, + final Long min, final Long max, final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; + return new TypeSafeMatcher<MetricResult<DistributionResult>>() { + @Override + protected boolean matchesSafely(MetricResult<DistributionResult> item) { + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); + return Objects.equals(namespace, item.name().namespace()) + && Objects.equals(name, item.name().name()) + && item.step().contains(step) + && Objects.equals(min, metricValue.min()) + && Objects.equals(max, metricValue.max()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricResult{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(String.format(", %sMin=", metricState)).appendValue(min) + .appendText(String.format(", %sMax=", metricState)).appendValue(max) + .appendText("}"); + } + + @Override + protected void describeMismatchSafely(MetricResult<DistributionResult> item, + Description mismatchDescription) { + mismatchDescription.appendText("MetricResult{"); + + describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); + + if (!Objects.equals(min, metricValue.min())) { + mismatchDescription + .appendText(String.format("%sMin: ", metricState)).appendValue(min) + .appendText(" != ").appendValue(metricValue.min()); + } + + if (!Objects.equals(max, metricValue.max())) { + mismatchDescription + .appendText(String.format("%sMax: ", metricState)).appendValue(max) + .appendText(" != ").appendValue(metricValue.max()); + } + + mismatchDescription.appendText("}"); + } + }; + } + + private static <T> void describeMetricsResultMembersMismatch( + MetricResult<T> item, + Description mismatchDescription, + String namespace, + String name, + String step) { + if (!Objects.equals(namespace, item.name().namespace())) { + mismatchDescription + .appendText("inNamespace: ").appendValue(namespace) + .appendText(" != ").appendValue(item.name().namespace()); + } + + if (!Objects.equals(name, item.name().name())) { + mismatchDescription + .appendText("name: ").appendValue(name) + .appendText(" != ").appendValue(item.name().name()); + } + + if (!item.step().contains(step)) { + mismatchDescription + .appendText("step: ").appendValue(step) + .appendText(" != ").appendValue(item.step()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java deleted file mode 100644 index 0428ce1..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults; -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import java.io.Closeable; -import java.io.IOException; -import org.hamcrest.collection.IsIterableWithSize; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -/** - * Tests for {@link MetricsContainerStepMap}. - */ -public class MetricsContainerStepMapTest { - - private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName(); - private static final String STEP1 = "myStep1"; - private static final String STEP2 = "myStep2"; - - private static final long VALUE = 100; - - private static final Counter counter = - Metrics.counter( - MetricsContainerStepMapTest.class, - "myCounter"); - private static final Distribution distribution = - Metrics.distribution( - MetricsContainerStepMapTest.class, - "myDistribution"); - private static final Gauge gauge = - Metrics.gauge( - MetricsContainerStepMapTest.class, - "myGauge"); - - private static final MetricsContainer metricsContainer; - - static { - metricsContainer = new MetricsContainer(null); - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { - counter.inc(VALUE); - distribution.update(VALUE); - distribution.update(VALUE * 2); - gauge.set(VALUE); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - public void testAttemptedAccumulatedMetricResults() { - MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); - attemptedMetrics.update(STEP1, metricsContainer); - attemptedMetrics.update(STEP2, metricsContainer); - attemptedMetrics.update(STEP2, metricsContainer); - - MetricResults metricResults = - asAttemptedOnlyMetricResults(attemptedMetrics); - - MetricQueryResults step1res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); - - assertIterableSize(step1res.counters(), 1); - assertIterableSize(step1res.distributions(), 1); - assertIterableSize(step1res.gauges(), 1); - - assertCounter(step1res, STEP1, VALUE, false); - assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), - false); - assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); - - MetricQueryResults step2res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); - - assertIterableSize(step2res.counters(), 1); - assertIterableSize(step2res.distributions(), 1); - assertIterableSize(step2res.gauges(), 1); - - assertCounter(step2res, STEP2, VALUE * 2, false); - assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), - false); - assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); - - MetricQueryResults allres = - metricResults.queryMetrics(MetricsFilter.builder().build()); - - assertIterableSize(allres.counters(), 2); - assertIterableSize(allres.distributions(), 2); - assertIterableSize(allres.gauges(), 2); - } - - @Test - public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() { - MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); - attemptedMetrics.update(STEP1, metricsContainer); - MetricResults metricResults = - asAttemptedOnlyMetricResults(attemptedMetrics); - - MetricQueryResults step1res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); - - thrown.expect(UnsupportedOperationException.class); - thrown.expectMessage("This runner does not currently support committed metrics results."); - - assertCounter(step1res, STEP1, VALUE, true); - } - - @Test - public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() { - MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); - attemptedMetrics.update(STEP1, metricsContainer); - MetricResults metricResults = - asAttemptedOnlyMetricResults(attemptedMetrics); - - MetricQueryResults step1res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); - - thrown.expect(UnsupportedOperationException.class); - thrown.expectMessage("This runner does not currently support committed metrics results."); - - assertDistribution(step1res, STEP1, DistributionResult.ZERO, true); - } - - @Test - public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() { - MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); - attemptedMetrics.update(STEP1, metricsContainer); - MetricResults metricResults = - asAttemptedOnlyMetricResults(attemptedMetrics); - - MetricQueryResults step1res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); - - thrown.expect(UnsupportedOperationException.class); - thrown.expectMessage("This runner does not currently support committed metrics results."); - - assertGauge(step1res, STEP1, GaugeResult.empty(), true); - } - - @Test - public void testAttemptedAndCommittedAccumulatedMetricResults() { - MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); - attemptedMetrics.update(STEP1, metricsContainer); - attemptedMetrics.update(STEP1, metricsContainer); - attemptedMetrics.update(STEP2, metricsContainer); - attemptedMetrics.update(STEP2, metricsContainer); - attemptedMetrics.update(STEP2, metricsContainer); - - MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap(); - committedMetrics.update(STEP1, metricsContainer); - committedMetrics.update(STEP2, metricsContainer); - committedMetrics.update(STEP2, metricsContainer); - - MetricResults metricResults = - asMetricResults(attemptedMetrics, committedMetrics); - - MetricQueryResults step1res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); - - assertIterableSize(step1res.counters(), 1); - assertIterableSize(step1res.distributions(), 1); - assertIterableSize(step1res.gauges(), 1); - - assertCounter(step1res, STEP1, VALUE * 2, false); - assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), - false); - assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); - - assertCounter(step1res, STEP1, VALUE, true); - assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), - true); - assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true); - - MetricQueryResults step2res = - metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); - - assertIterableSize(step2res.counters(), 1); - assertIterableSize(step2res.distributions(), 1); - assertIterableSize(step2res.gauges(), 1); - - assertCounter(step2res, STEP2, VALUE * 3, false); - assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), - false); - assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); - - assertCounter(step2res, STEP2, VALUE * 2, true); - assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), - true); - assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true); - - MetricQueryResults allres = - metricResults.queryMetrics(MetricsFilter.builder().build()); - - assertIterableSize(allres.counters(), 2); - assertIterableSize(allres.distributions(), 2); - assertIterableSize(allres.gauges(), 2); - } - - private <T> void assertIterableSize(Iterable<T> iterable, int size) { - assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size)); - } - - private void assertCounter( - MetricQueryResults metricQueryResults, - String step, - Long expected, - boolean isCommitted) { - assertThat( - metricQueryResults.counters(), - hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, expected, isCommitted))); - } - - private void assertDistribution( - MetricQueryResults metricQueryResults, - String step, - DistributionResult expected, - boolean isCommitted) { - assertThat( - metricQueryResults.distributions(), - hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, expected, - isCommitted))); - } - - private void assertGauge( - MetricQueryResults metricQueryResults, - String step, - GaugeResult expected, - boolean isCommitted) { - assertThat( - metricQueryResults.gauges(), - hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, expected, isCommitted))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java deleted file mode 100644 index 38c00d3..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link MetricsContainer}. - */ -@RunWith(JUnit4.class) -public class MetricsContainerTest { - - @Test - public void testCounterDeltas() { - MetricsContainer container = new MetricsContainer("step1"); - CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); - CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); - assertThat("All counters should start out dirty", - container.getUpdates().counterUpdates(), containsInAnyOrder( - metricUpdate("name1", 0L), - metricUpdate("name2", 0L))); - container.commitUpdates(); - assertThat("After commit no counters should be dirty", - container.getUpdates().counterUpdates(), emptyIterable()); - - c1.update(5L); - c2.update(4L); - - assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder( - metricUpdate("name1", 5L), - metricUpdate("name2", 4L))); - - assertThat("Since we haven't committed, updates are still included", - container.getUpdates().counterUpdates(), containsInAnyOrder( - metricUpdate("name1", 5L), - metricUpdate("name2", 4L))); - - container.commitUpdates(); - assertThat("After commit there are no updates", - container.getUpdates().counterUpdates(), emptyIterable()); - - c1.update(8L); - assertThat(container.getUpdates().counterUpdates(), contains( - metricUpdate("name1", 13L))); - } - - @Test - public void testCounterCumulatives() { - MetricsContainer container = new MetricsContainer("step1"); - CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); - CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); - c1.update(2L); - c2.update(4L); - c1.update(3L); - - container.getUpdates(); - container.commitUpdates(); - assertThat("Committing updates shouldn't affect cumulative counter values", - container.getCumulative().counterUpdates(), containsInAnyOrder( - metricUpdate("name1", 5L), - metricUpdate("name2", 4L))); - - c1.update(8L); - assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder( - metricUpdate("name1", 13L), - metricUpdate("name2", 4L))); - } - - @Test - public void testDistributionDeltas() { - MetricsContainer container = new MetricsContainer("step1"); - DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1")); - DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2")); - - assertThat("Initial update includes initial zero-values", - container.getUpdates().distributionUpdates(), containsInAnyOrder( - metricUpdate("name1", DistributionData.EMPTY), - metricUpdate("name2", DistributionData.EMPTY))); - - container.commitUpdates(); - assertThat("No updates after commit", - container.getUpdates().distributionUpdates(), emptyIterable()); - - c1.update(5L); - c2.update(4L); - - assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder( - metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), - metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); - assertThat("Updates stay the same without commit", - container.getUpdates().distributionUpdates(), containsInAnyOrder( - metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), - metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); - - container.commitUpdates(); - assertThat("No updatess after commit", - container.getUpdates().distributionUpdates(), emptyIterable()); - - c1.update(8L); - c1.update(4L); - assertThat(container.getUpdates().distributionUpdates(), contains( - metricUpdate("name1", DistributionData.create(17, 3, 4, 8)))); - container.commitUpdates(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java index 0ce17b4..a29c13b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java @@ -18,15 +18,16 @@ package org.apache.beam.sdk.metrics; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; -import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** * Tests for {@link MetricsEnvironment}. @@ -41,8 +42,13 @@ public class MetricsEnvironmentTest { @Test public void testUsesAppropriateMetricsContainer() { Counter counter = Metrics.counter("ns", "name"); - MetricsContainer c1 = new MetricsContainer("step1"); - MetricsContainer c2 = new MetricsContainer("step2"); + + MetricsContainer c1 = Mockito.mock(MetricsContainer.class); + MetricsContainer c2 = Mockito.mock(MetricsContainer.class); + Counter counter1 = Mockito.mock(Counter.class); + Counter counter2 = Mockito.mock(Counter.class); + when(c1.getCounter(MetricName.named("ns", "name"))).thenReturn(counter1); + when(c2.getCounter(MetricName.named("ns", "name"))).thenReturn(counter2); MetricsEnvironment.setCurrentContainer(c1); counter.inc(); @@ -50,10 +56,9 @@ public class MetricsEnvironmentTest { counter.dec(); MetricsEnvironment.setCurrentContainer(null); - MetricUpdates updates1 = c1.getUpdates(); - MetricUpdates updates2 = c2.getUpdates(); - assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L))); - assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L))); + verify(counter1).inc(1L); + verify(counter2).inc(-1L); + verifyNoMoreInteractions(counter1, counter2); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java deleted file mode 100644 index 4104f8d..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -/** - * Tests for {@link MetricsMap}. - */ -@RunWith(JUnit4.class) -public class MetricsMapTest { - - public MetricsMap<String, AtomicLong> metricsMap = - new MetricsMap<>(new MetricsMap.Factory<String, AtomicLong>() { - @Override - public AtomicLong createInstance(String unusedKey) { - return new AtomicLong(); - } - }); - - @Test - public void testCreateSeparateInstances() { - AtomicLong foo = metricsMap.get("foo"); - AtomicLong bar = metricsMap.get("bar"); - - assertThat(foo, not(sameInstance(bar))); - } - - @Test - public void testReuseInstances() { - AtomicLong foo1 = metricsMap.get("foo"); - AtomicLong foo2 = metricsMap.get("foo"); - - assertThat(foo1, sameInstance(foo2)); - } - - @Test - public void testGet() { - assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class)); - - AtomicLong foo = metricsMap.get("foo"); - assertThat(metricsMap.tryGet("foo"), sameInstance(foo)); - } - - @Test - public void testGetEntries() { - AtomicLong foo = metricsMap.get("foo"); - AtomicLong bar = metricsMap.get("bar"); - assertThat(metricsMap.entries(), containsInAnyOrder( - hasEntry("foo", foo), - hasEntry("bar", bar))); - } - - private static Matcher<Map.Entry<String, AtomicLong>> hasEntry( - final String key, final AtomicLong value) { - return new TypeSafeMatcher<Entry<String, AtomicLong>>() { - - @Override - public void describeTo(Description description) { - description - .appendText("Map.Entry{key=").appendValue(key) - .appendText(", value=").appendValue(value) - .appendText("}"); - } - - @Override - protected boolean matchesSafely(Entry<String, AtomicLong> item) { - return Objects.equals(key, item.getKey()) - && Objects.equals(value, item.getValue()); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 084c445..bc768f8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -18,13 +18,14 @@ package org.apache.beam.sdk.metrics; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult; -import static org.hamcrest.Matchers.equalTo; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.distributionMinMax; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.Serializable; import org.apache.beam.sdk.PipelineResult; @@ -41,13 +42,13 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.hamcrest.CoreMatchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; /** * Tests for {@link Metrics}. @@ -95,40 +96,40 @@ public class MetricsTest implements Serializable { @Test public void testDistributionToCell() { - MetricsContainer container = new MetricsContainer("step"); - MetricsEnvironment.setCurrentContainer(container); + MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class); + Distribution mockDistribution = Mockito.mock(Distribution.class); + when(mockContainer.getDistribution(METRIC_NAME)).thenReturn(mockDistribution); Distribution distribution = Metrics.distribution(NS, NAME); + MetricsEnvironment.setCurrentContainer(mockContainer); distribution.update(5L); - DistributionCell cell = container.getDistribution(METRIC_NAME); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(5, 1, 5, 5))); + verify(mockDistribution).update(5L); distribution.update(36L); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(41, 2, 5, 36))); - distribution.update(1L); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 1, 36))); + verify(mockDistribution).update(36L); + verify(mockDistribution).update(1L); } @Test public void testCounterToCell() { - MetricsContainer container = new MetricsContainer("step"); - MetricsEnvironment.setCurrentContainer(container); + MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class); + Counter mockCounter = Mockito.mock(Counter.class); + when(mockContainer.getCounter(METRIC_NAME)).thenReturn(mockCounter); + Counter counter = Metrics.counter(NS, NAME); - CounterCell cell = container.getCounter(METRIC_NAME); + + MetricsEnvironment.setCurrentContainer(mockContainer); counter.inc(); - assertThat(cell.getCumulative(), CoreMatchers.equalTo(1L)); + verify(mockCounter).inc(1); counter.inc(47L); - assertThat(cell.getCumulative(), CoreMatchers.equalTo(48L)); + verify(mockCounter).inc(47); counter.dec(5L); - assertThat(cell.getCumulative(), CoreMatchers.equalTo(43L)); - - counter.dec(); - assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); + verify(mockCounter).inc(-5); } @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class, http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 12b7c78..691f7f4 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer; import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricMatchers; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -587,33 +587,29 @@ public class KafkaIOTest { Iterable<MetricResult<Long>> counters = metrics.counters(); - assertThat(counters, hasItem( - MetricMatchers.attemptedMetricsResult( - elementsRead.namespace(), - elementsRead.name(), - readStep, - 1000L))); - - assertThat(counters, hasItem( - MetricMatchers.attemptedMetricsResult( - elementsReadBySplit.namespace(), - elementsReadBySplit.name(), - readStep, - 1000L))); - - assertThat(counters, hasItem( - MetricMatchers.attemptedMetricsResult( - bytesRead.namespace(), - bytesRead.name(), - readStep, - 12000L))); - - assertThat(counters, hasItem( - MetricMatchers.attemptedMetricsResult( - bytesReadBySplit.namespace(), - bytesReadBySplit.name(), - readStep, - 12000L))); + assertThat(counters, hasItem(attemptedMetricsResult( + elementsRead.namespace(), + elementsRead.name(), + readStep, + 1000L))); + + assertThat(counters, hasItem(attemptedMetricsResult( + elementsReadBySplit.namespace(), + elementsReadBySplit.name(), + readStep, + 1000L))); + + assertThat(counters, hasItem(attemptedMetricsResult( + bytesRead.namespace(), + bytesRead.name(), + readStep, + 12000L))); + + assertThat(counters, hasItem(attemptedMetricsResult( + bytesReadBySplit.namespace(), + bytesReadBySplit.name(), + readStep, + 12000L))); MetricQueryResults backlogElementsMetrics = result.metrics().queryMetrics( @@ -912,7 +908,7 @@ public class KafkaIOTest { assertThat(metrics.counters(), hasItem( - MetricMatchers.attemptedMetricsResult( + attemptedMetricsResult( elementsWritten.namespace(), elementsWritten.name(), "writeToKafka",
