http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 3dd98e0..9e71300 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; import java.util.List; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -145,7 +145,8 @@ public class ReduceFnRunnerTest { @Test public void testOnElementBufferingDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), @@ -170,8 +171,7 @@ public class ReduceFnRunnerTest { // This element shouldn't be seen, because the trigger has finished injectElement(tester, 4); - long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( - MetricName.named(ReduceFnRunner.class, + long droppedElements = container.getCounter(MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); assertEquals(1, droppedElements); @@ -423,7 +423,8 @@ public class ReduceFnRunnerTest { @Test public void testWatermarkHoldAndLateData() throws Exception { - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); // Test handling of late data. Specifically, ensure the watermark hold is correct. ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, @@ -452,7 +453,7 @@ public class ReduceFnRunnerTest { // Holding for the end-of-window transition. assertEquals(new Instant(9), tester.getWatermarkHold()); // Nothing dropped. - long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -514,7 +515,7 @@ public class ReduceFnRunnerTest { // Because we're about to expire the window, we output it. when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); injectElement(tester, 8); - droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest { */ @Test public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( WindowingStrategy.of( SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) @@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest { // assigned to [-30, 70), [0, 100), [30, 130) TimestampedValue.of(12, new Instant(40))); - long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1109,7 +1111,7 @@ public class ReduceFnRunnerTest { // but [-30, 70) is closed by the trigger TimestampedValue.of(14, new Instant(60))); - droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1120,7 +1122,7 @@ public class ReduceFnRunnerTest { // but they are all closed tester.injectElements(TimestampedValue.of(16, new Instant(40))); - droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1129,7 +1131,8 @@ public class ReduceFnRunnerTest { @Test public void testIdempotentEmptyPanesDiscarding() throws Exception { - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); // Test uninteresting (empty) panes don't increment the index or otherwise // modify PaneInfo. ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = @@ -1170,7 +1173,7 @@ public class ReduceFnRunnerTest { assertTrue(tester.isMarkedFinished(firstWindow)); tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); - long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1179,7 +1182,8 @@ public class ReduceFnRunnerTest { @Test public void testIdempotentEmptyPanesAccumulating() throws Exception { - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); // Test uninteresting (empty) panes don't increment the index or otherwise // modify PaneInfo. ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = @@ -1222,7 +1226,7 @@ public class ReduceFnRunnerTest { assertTrue(tester.isMarkedFinished(firstWindow)); tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); - long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( + long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 76351e4..5172f43 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -25,10 +25,10 @@ import static org.mockito.Mockito.when; import com.google.common.base.MoreObjects; import java.util.Collections; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -93,7 +93,8 @@ public class StatefulDoFnRunnerTest { @Test public void testLateDropping() throws Exception { - MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); + MetricsContainerImpl container = new MetricsContainerImpl("any"); + MetricsEnvironment.setCurrentContainer(container); timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); @@ -117,9 +118,8 @@ public class StatefulDoFnRunnerTest { WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING)); - long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter( - MetricName.named(StatefulDoFnRunner.class, - StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue(); + long droppedValues = container.getCounter(MetricName.named(StatefulDoFnRunner.class, + StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER)).getCumulative().longValue(); assertEquals(1L, droppedValues); runner.finishBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java new file mode 100644 index 0000000..12906d9 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/CounterCellTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.metrics.MetricName; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CounterCell}. + */ +@RunWith(JUnit4.class) +public class CounterCellTest { + + private CounterCell cell = new CounterCell(MetricName.named("hello", "world")); + + @Test + public void testDeltaAndCumulative() { + cell.inc(5); + cell.inc(7); + assertThat(cell.getCumulative(), equalTo(12L)); + assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + assertThat(cell.getCumulative(), equalTo(12L)); + + cell.inc(30); + assertThat(cell.getCumulative(), equalTo(42L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java new file mode 100644 index 0000000..e5c8898 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DirtyStateTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DirtyStateTest}. + */ +@RunWith(JUnit4.class) +public class DirtyStateTest { + + private final DirtyState dirty = new DirtyState(); + + @Test + public void basicPath() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + + dirty.afterModification(); + assertThat("Should be dirty after change", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + } + + @Test + public void changeAfterBeforeCommit() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterModification(); + dirty.afterCommit(); + assertThat("Changes after beforeCommit should be dirty after afterCommit", + dirty.beforeCommit(), is(true)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java new file mode 100644 index 0000000..c0b045a --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/DistributionCellTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.metrics.MetricName; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DistributionCell}. + */ +@RunWith(JUnit4.class) +public class DistributionCellTest { + private DistributionCell cell = new DistributionCell(MetricName.named("hello", "world")); + + @Test + public void testDeltaAndCumulative() { + cell.update(5); + cell.update(7); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + assertThat("getCumulative is idempotent", + cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + cell.update(30); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30))); + + assertThat("Adding a new value made the cell dirty", + cell.getDirty().beforeCommit(), equalTo(true)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java new file mode 100644 index 0000000..cfaf92d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GaugeCellTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.metrics.MetricName; +import org.junit.Test; + +/** + * Tests for {@link GaugeCell}. + */ +public class GaugeCellTest { + private GaugeCell cell = new GaugeCell(MetricName.named("hello", "world")); + + @Test + public void testDeltaAndCumulative() { + cell.set(5); + cell.set(7); + assertThat(cell.getCumulative().value(), equalTo(GaugeData.create(7).value())); + assertThat("getCumulative is idempotent", + cell.getCumulative().value(), equalTo(7L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + cell.set(30); + assertThat(cell.getCumulative().value(), equalTo(30L)); + + assertThat("Adding a new value made the cell dirty", + cell.getDirty().beforeCommit(), equalTo(true)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java new file mode 100644 index 0000000..a5ee874 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricUpdateMatchers.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import java.util.Objects; +import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matchers for {@link MetricUpdate}. + */ +public class MetricUpdateMatchers { + + /** + * Matches a {@link MetricUpdate} with the given name and contents. + * + * <p>Visible since it may be used in runner-specific tests. + */ + public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) { + return new TypeSafeMatcher<MetricUpdate<T>>() { + @Override + protected boolean matchesSafely(MetricUpdate<T> item) { + return Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{name=").appendValue(name) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } + + /** + * Matches a {@link MetricUpdate} with the given namespace, name, step and contents. + * + * <p>Visible since it may be used in runner-specific tests. + */ + public static <T> Matcher<MetricUpdate<T>> metricUpdate( + final String namespace, final String name, final String step, final T update) { + return new TypeSafeMatcher<MetricUpdate<T>>() { + @Override + protected boolean matchesSafely(MetricUpdate<T> item) { + return Objects.equals(namespace, item.getKey().metricName().namespace()) + && Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(step, item.getKey().stepName()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java new file mode 100644 index 0000000..b304d3b --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.apache.beam.runners.core.metrics.MetricUpdateMatchers.metricUpdate; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.metrics.MetricName; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsContainerImpl}. + */ +@RunWith(JUnit4.class) +public class MetricsContainerImplTest { + + @Test + public void testCounterDeltas() { + MetricsContainerImpl container = new MetricsContainerImpl("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + assertThat("All counters should start out dirty", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 0L), + metricUpdate("name2", 0L))); + container.commitUpdates(); + assertThat("After commit no counters should be dirty", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(5L); + c2.inc(4L); + + assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + assertThat("Since we haven't committed, updates are still included", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + container.commitUpdates(); + assertThat("After commit there are no updates", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(8L); + assertThat(container.getUpdates().counterUpdates(), contains( + metricUpdate("name1", 13L))); + } + + @Test + public void testCounterCumulatives() { + MetricsContainerImpl container = new MetricsContainerImpl("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + c1.inc(2L); + c2.inc(4L); + c1.inc(3L); + + container.getUpdates(); + container.commitUpdates(); + assertThat("Committing updates shouldn't affect cumulative counter values", + container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + c1.inc(8L); + assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 13L), + metricUpdate("name2", 4L))); + } + + @Test + public void testDistributionDeltas() { + MetricsContainerImpl container = new MetricsContainerImpl("step1"); + DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1")); + DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2")); + + assertThat("Initial update includes initial zero-values", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.EMPTY), + metricUpdate("name2", DistributionData.EMPTY))); + + container.commitUpdates(); + assertThat("No updates after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(5L); + c2.update(4L); + + assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + assertThat("Updates stay the same without commit", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + + container.commitUpdates(); + assertThat("No updatess after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(8L); + c1.update(4L); + assertThat(container.getUpdates().distributionUpdates(), contains( + metricUpdate("name1", DistributionData.create(17, 3, 4, 8)))); + container.commitUpdates(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java new file mode 100644 index 0000000..3074cc0 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asMetricResults; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult; +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.hamcrest.collection.IsIterableWithSize; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Tests for {@link MetricsContainerStepMap}. + */ +public class MetricsContainerStepMapTest { + + private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName(); + private static final String STEP1 = "myStep1"; + private static final String STEP2 = "myStep2"; + private static final String COUNTER_NAME = "myCounter"; + private static final String DISTRIBUTION_NAME = "myDistribution"; + private static final String GAUGE_NAME = "myGauge"; + + private static final long VALUE = 100; + + private static final Counter counter = + Metrics.counter(MetricsContainerStepMapTest.class, COUNTER_NAME); + private static final Distribution distribution = + Metrics.distribution(MetricsContainerStepMapTest.class, DISTRIBUTION_NAME); + private static final Gauge gauge = + Metrics.gauge(MetricsContainerStepMapTest.class, GAUGE_NAME); + + private static final MetricsContainerImpl metricsContainer; + + static { + metricsContainer = new MetricsContainerImpl(null); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { + counter.inc(VALUE); + distribution.update(VALUE); + distribution.update(VALUE * 2); + gauge.set(VALUE); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + assertIterableSize(step1res.counters(), 1); + assertIterableSize(step1res.distributions(), 1); + assertIterableSize(step1res.gauges(), 1); + + assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, false); + assertDistribution(DISTRIBUTION_NAME, + step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + false); + assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); + + MetricQueryResults step2res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); + + assertIterableSize(step2res.counters(), 1); + assertIterableSize(step2res.distributions(), 1); + assertIterableSize(step2res.gauges(), 1); + + assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, false); + assertDistribution( + DISTRIBUTION_NAME, step2res, STEP2, + DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), false); + assertGauge( + GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); + + MetricQueryResults allres = + metricResults.queryMetrics(MetricsFilter.builder().build()); + + assertIterableSize(allres.counters(), 2); + assertIterableSize(allres.distributions(), 2); + assertIterableSize(allres.gauges(), 2); + } + + @Test + public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true); + } + + @Test + public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertDistribution(DISTRIBUTION_NAME, step1res, STEP1, DistributionResult.ZERO, true); + } + + @Test + public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.empty(), true); + } + + @Test + public void testAttemptedAndCommittedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + + MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap(); + committedMetrics.update(STEP1, metricsContainer); + committedMetrics.update(STEP2, metricsContainer); + committedMetrics.update(STEP2, metricsContainer); + + MetricResults metricResults = + asMetricResults(attemptedMetrics, committedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + assertIterableSize(step1res.counters(), 1); + assertIterableSize(step1res.distributions(), 1); + assertIterableSize(step1res.gauges(), 1); + + assertCounter(COUNTER_NAME, step1res, STEP1, VALUE * 2, false); + assertDistribution( + DISTRIBUTION_NAME, step1res, STEP1, + DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), false); + assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); + + assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true); + assertDistribution(DISTRIBUTION_NAME, step1res, STEP1, + DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), true); + assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true); + + MetricQueryResults step2res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); + + assertIterableSize(step2res.counters(), 1); + assertIterableSize(step2res.distributions(), 1); + assertIterableSize(step2res.gauges(), 1); + + assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 3, false); + assertDistribution(DISTRIBUTION_NAME, step2res, STEP2, + DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), false); + assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); + + assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, true); + assertDistribution(DISTRIBUTION_NAME, step2res, STEP2, + DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), true); + assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true); + + MetricQueryResults allres = + metricResults.queryMetrics(MetricsFilter.builder().build()); + + assertIterableSize(allres.counters(), 2); + assertIterableSize(allres.distributions(), 2); + assertIterableSize(allres.gauges(), 2); + } + + private <T> void assertIterableSize(Iterable<T> iterable, int size) { + assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size)); + } + + private void assertCounter( + String name, + MetricQueryResults metricQueryResults, + String step, + Long expected, + boolean isCommitted) { + assertThat( + metricQueryResults.counters(), + hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted))); + } + + private void assertDistribution( + String name, + MetricQueryResults metricQueryResults, + String step, + DistributionResult expected, + boolean isCommitted) { + assertThat( + metricQueryResults.distributions(), + hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted))); + } + + private void assertGauge( + String name, + MetricQueryResults metricQueryResults, + String step, + GaugeResult expected, + boolean isCommitted) { + assertThat( + metricQueryResults.gauges(), + hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java new file mode 100644 index 0000000..96210dd --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsMapTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +/** + * Tests for {@link MetricsMap}. + */ +@RunWith(JUnit4.class) +public class MetricsMapTest { + + public MetricsMap<String, AtomicLong> metricsMap = + new MetricsMap<>(new MetricsMap.Factory<String, AtomicLong>() { + @Override + public AtomicLong createInstance(String unusedKey) { + return new AtomicLong(); + } + }); + + @Test + public void testCreateSeparateInstances() { + AtomicLong foo = metricsMap.get("foo"); + AtomicLong bar = metricsMap.get("bar"); + + assertThat(foo, not(sameInstance(bar))); + } + + @Test + public void testReuseInstances() { + AtomicLong foo1 = metricsMap.get("foo"); + AtomicLong foo2 = metricsMap.get("foo"); + + assertThat(foo1, sameInstance(foo2)); + } + + @Test + public void testGet() { + assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class)); + + AtomicLong foo = metricsMap.get("foo"); + assertThat(metricsMap.tryGet("foo"), sameInstance(foo)); + } + + @Test + public void testGetEntries() { + AtomicLong foo = metricsMap.get("foo"); + AtomicLong bar = metricsMap.get("bar"); + assertThat(metricsMap.entries(), containsInAnyOrder( + hasEntry("foo", foo), + hasEntry("bar", bar))); + } + + private static Matcher<Map.Entry<String, AtomicLong>> hasEntry( + final String key, final AtomicLong value) { + return new TypeSafeMatcher<Entry<String, AtomicLong>>() { + + @Override + public void describeTo(Description description) { + description + .appendText("Map.Entry{key=").appendValue(key) + .appendText(", value=").appendValue(value) + .appendText("}"); + } + + @Override + protected boolean matchesSafely(Entry<String, AtomicLong> item) { + return Objects.equals(key, item.getKey()) + && Objects.equals(value, item.getValue()); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index b7cd6e7..8169332 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -32,20 +32,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.runners.core.metrics.DistributionData; +import org.apache.beam.runners.core.metrics.GaugeData; +import org.apache.beam.runners.core.metrics.MetricFiltering; +import org.apache.beam.runners.core.metrics.MetricKey; +import org.apache.beam.runners.core.metrics.MetricUpdates; +import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.runners.core.metrics.MetricsMap; import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricFiltering; -import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricUpdates; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.beam.sdk.metrics.MetricsMap; /** * Implementation of {@link MetricResults} for the Direct Runner. http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index c7f7847..6efb99f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -23,9 +23,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.Set; +import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 56f8650..76d817b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -21,8 +21,8 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; -import org.apache.beam.sdk.metrics.MetricUpdates; -import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.runners.core.metrics.MetricUpdates; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -92,7 +92,7 @@ class TransformExecutor<T> implements Runnable { @Override public void run() { - MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); + MetricsContainerImpl metricsContainer = new MetricsContainerImpl(transform.getFullName()); try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { Collection<ModelEnforcement<T>> enforcements = new ArrayList<>(); for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { @@ -134,7 +134,7 @@ class TransformExecutor<T> implements Runnable { */ private void processElements( TransformEvaluator<T> evaluator, - MetricsContainer metricsContainer, + MetricsContainerImpl metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception { if (inputBundle != null) { @@ -167,7 +167,7 @@ class TransformExecutor<T> implements Runnable { * {@link TransformEvaluator#finishBundle()} */ private TransformResult<T> finishBundle( - TransformEvaluator<T> evaluator, MetricsContainer metricsContainer, + TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer, Collection<ModelEnforcement<T>> enforcements) throws Exception { TransformResult<T> result = http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index 8d7aeda..554c2d6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -19,9 +19,9 @@ package org.apache.beam.runners.direct; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index d5d0aff..8cdf323 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -18,23 +18,23 @@ package org.apache.beam.runners.direct; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.runners.core.metrics.DistributionData; +import org.apache.beam.runners.core.metrics.GaugeData; +import org.apache.beam.runners.core.metrics.MetricKey; +import org.apache.beam.runners.core.metrics.MetricUpdates; +import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricUpdates; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsFilter; import org.joda.time.Instant; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 038895a..b1f956d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -17,15 +17,15 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import java.io.IOException; import java.util.Collections; import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.joda.time.Duration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index 40191d2..ccce127 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.metrics; import java.io.Closeable; import java.io.IOException; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -29,7 +30,7 @@ import org.joda.time.Instant; /** * {@link DoFnRunner} decorator which registers - * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to Flink metrics and + * {@link MetricsContainerImpl}. It updates metrics to Flink metrics and * accumulators in {@link #finishBundle()}. */ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index f81205e..677051e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,17 +17,18 @@ */ package org.apache.beam.runners.flink.metrics; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; @@ -37,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to + * Helper class for holding a {@link MetricsContainerImpl} and forwarding Beam metrics to * Flink accumulators and metrics. */ public class FlinkMetricContainer { http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java index a9dc2ce..d8b5d25 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink.metrics; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.SimpleAccumulator; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 64738cc..85d6426 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.metrics; import java.io.Closeable; import java.io.IOException; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -26,8 +27,8 @@ import org.apache.beam.sdk.options.PipelineOptions; /** * Util for invoking {@link Source.Reader} methods that might require a - * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active. - * Source.Reader decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}. + * {@link MetricsContainerImpl} to be active. + * Source.Reader decorator which registers {@link MetricsContainerImpl}. * It update metrics to Flink metric and accumulator in start and advance. */ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> { http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 3809335..f679976 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170508-pr2933</dataflow.container_version> + <dataflow.container_version>beam-master-20170510</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index aa80959..a6a6a43 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -28,10 +28,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.beam.runners.core.metrics.MetricFiltering; +import org.apache.beam.runners.core.metrics.MetricKey; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricFiltering; -import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index bca3428..85a0979 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index 3986e33..7fd6962 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import java.io.IOException; import java.util.Objects; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index aa89c59..01cc176 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -28,13 +28,13 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.Accumulator; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 2a9de4b..7106c73 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.Collections; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index 1dcfa2f..03b2187 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -21,10 +21,10 @@ package org.apache.beam.runners.spark.metrics; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import java.io.IOException; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.Accumulator; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java index dee4ebc..17cf71f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.metrics; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.spark.AccumulatorParam; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index e4bd598..5e67445 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -18,19 +18,19 @@ package org.apache.beam.runners.spark.metrics; -import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import com.codahale.metrics.Metric; import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsFilter; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 37d9635..1d843f9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -31,6 +31,8 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.metrics.CounterCell; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.SparkPipelineOptions; @@ -43,9 +45,7 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.metrics.CounterCell; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -207,7 +207,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { new OutputWindowedValueHolder<>(); // use in memory Aggregators since Spark Accumulators are not resilient // in stateful operators, once done with this partition. - final MetricsContainer cellProvider = new MetricsContainer("cellProvider"); + final MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider"); final CounterCell droppedDueToClosedWindow = cellProvider.getCounter( MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER)); @@ -321,12 +321,12 @@ public class SparkGroupAlsoByWindowViaWindowSet { long lateDropped = droppedDueToLateness.getCumulative(); if (lateDropped > 0) { LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped)); - droppedDueToLateness.update(-droppedDueToLateness.getCumulative()); + droppedDueToLateness.inc(-droppedDueToLateness.getCumulative()); } long closedWindowDropped = droppedDueToClosedWindow.getCumulative(); if (closedWindowDropped > 0) { LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped)); - droppedDueToClosedWindow.update(-droppedDueToClosedWindow.getCumulative()); + droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative()); } return scala.collection.JavaConversions.asScalaIterator(outIter); http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 17a3c73..549bd30 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.EmptyCheckpointMark; import org.apache.beam.runners.spark.io.MicrobatchSource; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index 8349b09..394b80b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -21,8 +21,9 @@ package org.apache.beam.runners.spark.translation; import java.io.Closeable; import java.io.IOException; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -32,7 +33,7 @@ import org.joda.time.Instant; /** - * DoFnRunner decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}. + * DoFnRunner decorator which registers {@link MetricsContainerImpl}. */ class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { private final DoFnRunner<InputT, OutputT> delegate; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index ecf96b6..4a66541 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -27,10 +27,10 @@ import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index acbac32..b2ed3a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -41,7 +42,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index f736e53..acb4a02 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -58,7 +59,6 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java index 86f35ba..5728fa0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.NoSuchElementException; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -91,7 +91,7 @@ public class ReaderToIteratorAdapterTest { private final TestReader testReader = new TestReader(); private final SourceRDD.Bounded.ReaderToIteratorAdapter<Integer> readerIterator = - new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new MetricsContainer(""), testReader); + new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new MetricsContainerImpl(""), testReader); private void assertReaderRange(final int start, final int end) { for (int i = start; i < end; i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 584edac..254a318 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index df6027c..cafd539 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat;
