Fix binding of aggregator creating in OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e185d0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e185d0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e185d0b Branch: refs/heads/gearpump-runner Commit: 4e185d0b0a7ec4c096380a25b9cbe4703621ec6b Parents: 3094017 Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 12:44:30 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 21:04:17 2016 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e185d0b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index b269f47..a9f26a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -511,6 +511,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl public AdaptedContext( DoFn<InputT, OutputT>.Context newContext) { this.newContext = newContext; + super.setupDelegateAggregators(); } @Override @@ -541,7 +542,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl @Override protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return null; + return newContext.createAggregator(name, combiner); } } @@ -625,7 +626,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl @Override protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return null; + return newContext.createAggregator(name, combiner); } }
