Repository: incubator-beam Updated Branches: refs/heads/master e82c5d224 -> beccdc686
[BEAM-862] Make Aggregator Creation Idempotent The problem was that the DoFnInvoker was invoking createAggregatorForDoFn in the AggregatorFactory several times and Flink only allows adding each aggregator once. This now adds a check for whether an aggregator exists already. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/837bb2b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/837bb2b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/837bb2b7 Branch: refs/heads/master Commit: 837bb2b71e515c9170fa2c031c86a618b085b249 Parents: e82c5d2 Author: Aljoscha Krettek <[email protected]> Authored: Thu Oct 27 15:31:20 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 28 22:48:04 2016 +0200 ---------------------------------------------------------------------- .../translation/wrappers/streaming/DoFnOperator.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/837bb2b7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index fb444e0..a29664b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -174,10 +174,16 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> ExecutionContext.StepContext stepContext, String aggregatorName, Combine.CombineFn<InputT, AccumT, OutputT> combine) { + + @SuppressWarnings("unchecked") SerializableFnAggregatorWrapper<InputT, OutputT> result = - new SerializableFnAggregatorWrapper<>(combine); + (SerializableFnAggregatorWrapper<InputT, OutputT>) + getRuntimeContext().getAccumulator(aggregatorName); - getRuntimeContext().addAccumulator(aggregatorName, result); + if (result == null) { + result = new SerializableFnAggregatorWrapper<>(combine); + getRuntimeContext().addAccumulator(aggregatorName, result); + } return result; } };
