Add setupDelegatingAggregators for DoFn (for now)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2e751f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2e751f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2e751f4 Branch: refs/heads/master Commit: c2e751f49d72968f2478931cdb884fd4af173610 Parents: 08dd149 Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 11:53:29 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:51 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 1 + .../org/apache/beam/sdk/transforms/DoFn.java | 24 ++++++++++++++++++++ 2 files changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 0360bc2..1cf56a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -228,6 +228,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out this.stepContext = stepContext; this.aggregatorFactory = aggregatorFactory; this.windowFn = windowFn; + super.setupDelegateAggregators(); } ////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 0531cbb..11ca853 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -214,6 +214,30 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( String name, CombineFn<AggInputT, ?, AggOutputT> combiner); + + /** + * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context. + * + * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method. + */ + @Experimental(Kind.AGGREGATOR) + protected final void setupDelegateAggregators() { + for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) { + setupDelegateAggregator(aggregator); + } + + aggregatorsAreFinal = true; + } + + private <AggInputT, AggOutputT> void setupDelegateAggregator( + DelegatingAggregator<AggInputT, AggOutputT> aggregator) { + + Aggregator<AggInputT, AggOutputT> delegate = createAggregator( + aggregator.getName(), aggregator.getCombineFn()); + + aggregator.setDelegate(delegate); + } + } /**
