Make DoFnTester aggregator initialization idempotent
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/043ebeca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/043ebeca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/043ebeca Branch: refs/heads/gearpump-runner Commit: 043ebecacf7a8e96939b025afa8480c6df2f3b41 Parents: 2ab955d Author: Kenneth Knowles <[email protected]> Authored: Fri Oct 21 13:35:29 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 21:04:17 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/DoFnTester.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/043ebeca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 302bb02..7995719 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -543,6 +543,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator( final String name, final CombineFn<AinT, AccT, AoutT> combiner) { + Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() { @Override public void addValue(AinT value) { @@ -561,7 +562,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return combiner; } }; - accumulators.put(name, combiner.createAccumulator()); + + // Aggregator instantiation is idempotent + if (accumulators.containsKey(name)) { + Class<?> currentAccumClass = accumulators.get(name).getClass(); + Class<?> createAccumClass = combiner.createAccumulator().getClass(); + checkState( + currentAccumClass.isAssignableFrom(createAccumClass), + "Aggregator %s already initialized with accumulator type %s " + + "but was re-initialized with accumulator type %s", + name, + currentAccumClass, + createAccumClass); + + } else { + accumulators.put(name, combiner.createAccumulator()); + } return aggregator; }
