Delegate getAggregators() in various DoFn adapters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d46c2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d46c2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d46c2d Branch: refs/heads/master Commit: b0d46c2deb4318f8d0e55eeeb20e1d11ceadd218 Parents: 6fa8057 Author: Kenneth Knowles <k...@google.com> Authored: Thu Nov 17 15:50:17 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Nov 18 15:09:43 2016 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++++- .../org/apache/beam/sdk/transforms/DoFnAdapters.java | 6 ++++++ .../java/org/apache/beam/sdk/transforms/OldDoFn.java | 7 ++++++- .../java/org/apache/beam/sdk/transforms/OldDoFnTest.java | 11 +++++++---- 4 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9978ef4..221d942 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -28,6 +28,8 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -236,7 +238,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD aggregator.setDelegate(delegate); } - } /** @@ -298,6 +299,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>(); + Collection<Aggregator<?, ?>> getAggregators() { + return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values()); + } + /** * Protects aggregators from being created after initialization. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index a3466bb..1a74ae7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; +import java.util.Collection; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -244,6 +245,11 @@ public class DoFnAdapters { } @Override + Collection<Aggregator<?, ?>> getAggregators() { + return fn.getAggregators(); + } + + @Override public Duration getAllowedTimestampSkew() { return fn.getAllowedTimestampSkew(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index f16e0b3..9bf9003 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -676,6 +676,11 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl } @Override + Collection<Aggregator<?, ?>> getAggregators() { + return OldDoFn.this.getAggregators(); + } + + @Override protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { return OldDoFn.this.getOutputTypeDescriptor(); } @@ -683,7 +688,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl /** * A {@link ProcessContext} for an {@link OldDoFn} that implements - * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}. + * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}. */ private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java index e7ae135..07e3078 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.AggregatorValues; @@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -216,14 +216,17 @@ public class OldDoFnTest implements Serializable { Pipeline pipeline = TestPipeline.create(); CountOddsFn countOdds = new CountOddsFn(); - pipeline + PCollection<Void> output = pipeline .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) .apply(ParDo.of(countOdds)); PipelineResult result = pipeline.run(); AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator); - assertThat(values.getValuesAtSteps(), - equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4))); + + Map<String, Integer> valuesMap = values.getValuesAtSteps(); + + assertThat(valuesMap.size(), equalTo(1)); + assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4)); } private static class CountOddsFn extends OldDoFn<Integer, Void> {