mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this fixes ParDoLifecycleTest.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8627913e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8627913e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8627913e Branch: refs/heads/mr-runner Commit: 8627913eeb0a51a251a953930fc52025dbf8a723 Parents: e330d36 Author: Pei He <p...@apache.org> Authored: Fri Sep 1 14:10:22 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 1 17:13:53 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/mapreduce/translation/Operation.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8627913e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index bd24f05..a96806d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -31,12 +31,16 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; public abstract class Operation<T> implements Serializable { private final OutputReceiver[] receivers; private SerializableConfiguration conf; + private boolean started; + private boolean finished; public Operation(int numOutputs) { this.receivers = new OutputReceiver[numOutputs]; for (int i = 0; i < numOutputs; ++i) { receivers[i] = new OutputReceiver(); } + this.started = false; + this.finished = false; } /** @@ -45,6 +49,10 @@ public abstract class Operation<T> implements Serializable { * <p>Called after all successors consuming operations have been started. */ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + if (started) { + return; + } + started = true; conf = new SerializableConfiguration(taskContext.getConfiguration()); for (OutputReceiver receiver : receivers) { if (receiver == null) { @@ -67,6 +75,10 @@ public abstract class Operation<T> implements Serializable { * <p>Called after all predecessors producing operations have been finished. */ public void finish() { + if (finished) { + return; + } + finished = true; for (OutputReceiver receiver : receivers) { if (receiver == null) { continue;