Removing Aggregators from runner-specific examples and tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fe11d12 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fe11d12 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fe11d12 Branch: refs/heads/master Commit: 1fe11d12404b53d978e70ee2c7c37582dcad10c9 Parents: 904b413 Author: Pablo <[email protected]> Authored: Tue Mar 7 13:03:27 2017 -0800 Committer: bchambers <[email protected]> Committed: Tue Apr 25 12:45:33 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/DebuggingWordCount.java | 2 +- .../runners/apex/examples/WordCountTest.java | 9 +- .../core/DoFnDelegatingAggregatorTest.java | 144 -------- .../beam/runners/core/OldDoFnContextTest.java | 72 ---- .../apache/beam/runners/core/OldDoFnTest.java | 32 -- .../beam/runners/direct/DirectRunner.java | 3 - .../dataflow/DataflowPipelineJobTest.java | 337 ------------------- .../beam/runners/spark/examples/WordCount.java | 13 +- 8 files changed, 11 insertions(+), 601 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 4c82f46..e6e3a92 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -95,7 +95,7 @@ public class DebuggingWordCount { * in a dashboard, etc. */ private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords"); - private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unMatchedWords"); + private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords"); @ProcessElement public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index a1713ac..b980715 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -30,15 +30,15 @@ import org.apache.beam.runners.apex.TestApexRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -67,13 +67,12 @@ public class WordCountTest { static class ExtractWordsFn extends DoFn<String, String> { private static final long serialVersionUID = 1L; - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); + private final Counter emptyLines = Metrics.counter("main", "emptyLines"); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); + emptyLines.inc(1); } // Split the line into words. http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java deleted file mode 100644 index b44e8a4..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DelegatingAggregator; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link DelegatingAggregator}. - */ -@RunWith(JUnit4.class) -public class DoFnDelegatingAggregatorTest { - - @Mock - private Aggregator<Long, Long> delegate; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testAddValueWithoutDelegateThrowsException() { - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - String name = "agg"; - CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - - DelegatingAggregator<Double, Double> aggregator = - (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("cannot be called"); - thrown.expectMessage("DoFn"); - - aggregator.addValue(21.2); - } - - @Test - public void testSetDelegateThenAddValueCallsDelegate() { - String name = "agg"; - CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - DelegatingAggregator<Long, Long> aggregator = - (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner); - - aggregator.setDelegate(delegate); - - aggregator.addValue(12L); - - verify(delegate).addValue(12L); - } - - @Test - public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() { - String name = "agg"; - CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - DelegatingAggregator<Double, Double> aggregator = - (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); - - @SuppressWarnings("unchecked") - Aggregator<Double, Double> secondDelegate = - mock(Aggregator.class, "secondDelegate"); - - aggregator.setDelegate(aggregator); - aggregator.setDelegate(secondDelegate); - - aggregator.addValue(2.25); - - verify(secondDelegate).addValue(2.25); - verify(delegate, never()).addValue(anyLong()); - } - - @Test - public void testGetNameReturnsName() { - String name = "agg"; - CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - DelegatingAggregator<Double, Double> aggregator = - (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); - - assertEquals(name, aggregator.getName()); - } - - @Test - public void testGetCombineFnReturnsCombineFn() { - String name = "agg"; - CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - DelegatingAggregator<Double, Double> aggregator = - (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); - - assertEquals(combiner, aggregator.getCombineFn()); - } - - @SuppressWarnings("unchecked") - private static <T> CombineFn<T, ?, T> mockCombineFn( - @SuppressWarnings("unused") Class<T> clazz) { - return mock(CombineFn.class); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java deleted file mode 100644 index a1cd49d..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Sum; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link OldDoFn.Context}. - */ -@RunWith(JUnit4.class) -public class OldDoFnContextTest { - - @Mock - private Aggregator<Long, Long> agg; - - private OldDoFn<Object, Object> fn; - private OldDoFn<Object, Object>.Context context; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - - // Need to be real objects to call the constructor, and to reference the - // outer instance of OldDoFn - NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>(); - OldDoFn<Object, Object>.Context noOpContext = noOpFn.context(); - - fn = spy(noOpFn); - context = spy(noOpContext); - } - - @Test - public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() { - Combine.BinaryCombineLongFn combiner = Sum.ofLongs(); - Aggregator<Long, Long> delegateAggregator = - fn.createAggregator("test", combiner); - - when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); - - context.setupDelegateAggregators(); - delegateAggregator.addValue(1L); - - verify(agg).addValue(1L); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java index 425de07..d6838e2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java @@ -19,14 +19,11 @@ package org.apache.beam.runners.core; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import java.io.Serializable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -50,19 +47,6 @@ public class OldDoFnTest implements Serializable { public transient ExpectedException thrown = ExpectedException.none(); @Test - public void testCreateAggregatorWithCombinerSucceeds() { - String name = "testAggregator"; - Combine.BinaryCombineLongFn combiner = Sum.ofLongs(); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); - - assertEquals(name, aggregator.getName()); - assertEquals(combiner, aggregator.getCombineFn()); - } - - @Test public void testCreateAggregatorWithNullNameThrowsException() { thrown.expect(NullPointerException.class); thrown.expectMessage("name cannot be null"); @@ -114,22 +98,6 @@ public class OldDoFnTest implements Serializable { } @Test - public void testCreateAggregatorsWithDifferentNamesSucceeds() { - String nameOne = "testAggregator"; - String nameTwo = "aggregatorPrime"; - CombineFn<Double, ?, Double> combiner = Max.ofDoubles(); - - OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); - - Aggregator<Double, Double> aggregatorOne = - doFn.createAggregator(nameOne, combiner); - Aggregator<Double, Double> aggregatorTwo = - doFn.createAggregator(nameTwo, combiner); - - assertNotEquals(aggregatorOne, aggregatorTwo); - } - - @Test public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception { OldDoFn<String, String> fn = new OldDoFn<String, String>() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 45a04a7..77ec68f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,8 +34,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 9cab5e8..59315a7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -19,13 +19,8 @@ package org.apache.beam.runners.dataflow; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; @@ -38,36 +33,22 @@ import static org.mockito.Mockito.when; import com.google.api.client.util.NanoClock; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get; -import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics; import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages; import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricStructuredName; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSetMultimap; import java.io.IOException; -import java.math.BigDecimal; import java.net.SocketTimeoutException; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.values.PInput; @@ -367,323 +348,6 @@ public class DataflowPipelineJobTest { DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff); } - @Test - public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue() - throws IOException, AggregatorRetrievalException { - Aggregator<?, ?> aggregator = mock(Aggregator.class); - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = - appliedPTransform(fullName, pTransform, Pipeline.create(options)); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - JobMetrics jobMetrics = new JobMetrics(); - when(getMetrics.execute()).thenReturn(jobMetrics); - - jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of()); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - AggregatorValues<?> values = job.getAggregatorValues(aggregator); - - assertThat(values.getValues(), empty()); - } - - @Test - public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue() - throws IOException, AggregatorRetrievalException { - Aggregator<?, ?> aggregator = mock(Aggregator.class); - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = - appliedPTransform(fullName, pTransform, Pipeline.create(options)); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - JobMetrics jobMetrics = new JobMetrics(); - when(getMetrics.execute()).thenReturn(jobMetrics); - - jobMetrics.setMetrics(null); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - AggregatorValues<?> values = job.getAggregatorValues(aggregator); - - assertThat(values.getValues(), empty()); - } - - @Test - public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection() - throws IOException, AggregatorRetrievalException { - CombineFn<Long, long[], Long> combineFn = Sum.ofLongs(); - String aggregatorName = "agg"; - Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = - appliedPTransform(fullName, pTransform, Pipeline.create(options)); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - JobMetrics jobMetrics = new JobMetrics(); - when(getMetrics.execute()).thenReturn(jobMetrics); - - MetricUpdate update = new MetricUpdate(); - long stepValue = 1234L; - update.setScalar(new BigDecimal(stepValue)); - - MetricStructuredName structuredName = new MetricStructuredName(); - structuredName.setName(aggregatorName); - structuredName.setContext(ImmutableMap.of("step", stepName)); - update.setName(structuredName); - - jobMetrics.setMetrics(ImmutableList.of(update)); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - AggregatorValues<Long> values = job.getAggregatorValues(aggregator); - - assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue)); - assertThat(values.getValuesAtSteps().size(), equalTo(1)); - assertThat(values.getValues(), contains(stepValue)); - assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue))); - } - - @Test - public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection() - throws IOException, AggregatorRetrievalException { - CombineFn<Long, long[], Long> combineFn = Sum.ofLongs(); - String aggregatorName = "agg"; - Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); - - Pipeline p = Pipeline.create(options); - - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p); - - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> otherTransform = mock(PTransform.class); - String otherStepName = "s88"; - String otherFullName = "Spam/Ham/Eggs"; - AppliedPTransform<?, ?, ?> otherAppliedTransform = - appliedPTransform(otherFullName, otherTransform, p); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of( - aggregator, pTransform, aggregator, otherTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of( - appliedTransform, stepName, otherAppliedTransform, otherStepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - JobMetrics jobMetrics = new JobMetrics(); - when(getMetrics.execute()).thenReturn(jobMetrics); - - MetricUpdate updateOne = new MetricUpdate(); - long stepValue = 1234L; - updateOne.setScalar(new BigDecimal(stepValue)); - - MetricStructuredName structuredNameOne = new MetricStructuredName(); - structuredNameOne.setName(aggregatorName); - structuredNameOne.setContext(ImmutableMap.of("step", stepName)); - updateOne.setName(structuredNameOne); - - MetricUpdate updateTwo = new MetricUpdate(); - long stepValueTwo = 1024L; - updateTwo.setScalar(new BigDecimal(stepValueTwo)); - - MetricStructuredName structuredNameTwo = new MetricStructuredName(); - structuredNameTwo.setName(aggregatorName); - structuredNameTwo.setContext(ImmutableMap.of("step", otherStepName)); - updateTwo.setName(structuredNameTwo); - - jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo)); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - AggregatorValues<Long> values = job.getAggregatorValues(aggregator); - - assertThat(values.getValuesAtSteps(), hasEntry(fullName, stepValue)); - assertThat(values.getValuesAtSteps(), hasEntry(otherFullName, stepValueTwo)); - assertThat(values.getValuesAtSteps().size(), equalTo(2)); - assertThat(values.getValues(), containsInAnyOrder(stepValue, stepValueTwo)); - assertThat(values.getTotalValue(combineFn), equalTo(Long.valueOf(stepValue + stepValueTwo))); - } - - @Test - public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate() - throws IOException, AggregatorRetrievalException { - CombineFn<Long, long[], Long> combineFn = Sum.ofLongs(); - String aggregatorName = "agg"; - Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = - appliedPTransform(fullName, pTransform, Pipeline.create(options)); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - JobMetrics jobMetrics = new JobMetrics(); - when(getMetrics.execute()).thenReturn(jobMetrics); - - MetricUpdate ignoredUpdate = new MetricUpdate(); - ignoredUpdate.setScalar(null); - - MetricStructuredName ignoredName = new MetricStructuredName(); - ignoredName.setName("ignoredAggregator.elementCount.out0"); - ignoredName.setContext(null); - ignoredUpdate.setName(ignoredName); - - jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate)); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - AggregatorValues<Long> values = job.getAggregatorValues(aggregator); - - assertThat(values.getValuesAtSteps().entrySet(), empty()); - assertThat(values.getValues(), empty()); - } - - @Test - public void testGetAggregatorValuesWithUnusedAggregatorThrowsException() - throws AggregatorRetrievalException { - Aggregator<?, ?> aggregator = mock(Aggregator.class); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of().asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()); - - DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("not used in this pipeline"); - job.getAggregatorValues(aggregator); - } - - @Test - public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException() - throws IOException, AggregatorRetrievalException { - CombineFn<Long, long[], Long> combineFn = Sum.ofLongs(); - String aggregatorName = "agg"; - Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName); - @SuppressWarnings("unchecked") - PTransform<PInput, POutput> pTransform = mock(PTransform.class); - String stepName = "s1"; - String fullName = "Foo/Bar/Baz"; - AppliedPTransform<?, ?, ?> appliedTransform = - appliedPTransform(fullName, pTransform, Pipeline.create(options)); - - DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms( - ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(), - ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform, stepName)); - - GetMetrics getMetrics = mock(GetMetrics.class); - when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics); - IOException cause = new IOException(); - when(getMetrics.execute()).thenThrow(cause); - - Get getState = mock(Get.class); - when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState); - Job modelJob = new Job(); - when(getState.execute()).thenReturn(modelJob); - modelJob.setCurrentState(State.RUNNING.toString()); - - DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms); - - thrown.expect(AggregatorRetrievalException.class); - thrown.expectCause(is(cause)); - thrown.expectMessage(aggregator.toString()); - thrown.expectMessage("when retrieving Aggregator values for"); - job.getAggregatorValues(aggregator); - } - - private static class TestAggregator<InT, OutT> implements Aggregator<InT, OutT> { - private final CombineFn<InT, ?, OutT> combineFn; - private final String name; - - public TestAggregator(CombineFn<InT, ?, OutT> combineFn, String name) { - this.combineFn = combineFn; - this.name = name; - } - - @Override - public void addValue(InT value) { - throw new AssertionError(); - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<InT, ?, OutT> getCombineFn() { - return combineFn; - } - } - private AppliedPTransform<?, ?, ?> appliedPTransform( String fullName, PTransform<PInput, POutput> transform, Pipeline p) { PInput input = mock(PInput.class); @@ -696,7 +360,6 @@ public class DataflowPipelineJobTest { p); } - private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper { private long fastNanoTime; http://git-wip-us.apache.org/repos/asf/beam/blob/1fe11d12/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java index da14ee2..32caa9a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java @@ -19,18 +19,18 @@ package org.apache.beam.runners.spark.examples; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -44,14 +44,13 @@ public class WordCount { * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the * pipeline. */ - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); + public static class ExtractWordsFn extends DoFn<String, String> { + private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); + emptyLines.inc(); } // Split the line into words.
