mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this 
fixes ParDoLifecycleTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8627913e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8627913e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8627913e

Branch: refs/heads/mr-runner
Commit: 8627913eeb0a51a251a953930fc52025dbf8a723
Parents: e330d36
Author: Pei He <p...@apache.org>
Authored: Fri Sep 1 14:10:22 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Fri Sep 1 17:13:53 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/mapreduce/translation/Operation.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8627913e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index bd24f05..a96806d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -31,12 +31,16 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 public abstract class Operation<T> implements Serializable {
   private final OutputReceiver[] receivers;
   private SerializableConfiguration conf;
+  private boolean started;
+  private boolean finished;
 
   public Operation(int numOutputs) {
     this.receivers = new OutputReceiver[numOutputs];
     for (int i = 0; i < numOutputs; ++i) {
       receivers[i] = new OutputReceiver();
     }
+    this.started = false;
+    this.finished = false;
   }
 
   /**
@@ -45,6 +49,10 @@ public abstract class Operation<T> implements Serializable {
    * <p>Called after all successors consuming operations have been started.
    */
   public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    if (started) {
+      return;
+    }
+    started = true;
     conf = new SerializableConfiguration(taskContext.getConfiguration());
     for (OutputReceiver receiver : receivers) {
       if (receiver == null) {
@@ -67,6 +75,10 @@ public abstract class Operation<T> implements Serializable {
    * <p>Called after all predecessors producing operations have been finished.
    */
   public void finish() {
+    if (finished) {
+      return;
+    }
+    finished = true;
     for (OutputReceiver receiver : receivers) {
       if (receiver == null) {
         continue;

Reply via email to