mr-runner: support PCollections materialization with multiple MR jobs.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40396d75
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40396d75
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40396d75

Branch: refs/heads/mr-runner
Commit: 40396d758ad21e4938d395007583bf7c61ebdd97
Parents: 5905efd
Author: Pei He <p...@apache.org>
Authored: Tue Aug 8 11:30:29 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:49 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/MapReducePipelineOptions.java     |   4 +
 .../beam/runners/mapreduce/MapReduceRunner.java |  15 +-
 .../mapreduce/translation/BeamInputFormat.java  |  85 +++++++---
 .../mapreduce/translation/BeamMapper.java       |   8 +-
 .../mapreduce/translation/BeamReducer.java      |  20 +--
 .../mapreduce/translation/DotfileWriter.java    |  53 +++++-
 .../translation/FileReadOperation.java          | 165 +++++++++++++++++++
 .../translation/FileWriteOperation.java         |  77 +++++++++
 .../translation/FlattenTranslator.java          |   7 +-
 .../runners/mapreduce/translation/Graph.java    |  79 +++++----
 .../mapreduce/translation/GraphPlanner.java     |  67 +++++---
 .../runners/mapreduce/translation/Graphs.java   | 106 +++++++++---
 .../GroupAlsoByWindowsParDoOperation.java       |   5 +-
 .../translation/GroupByKeyTranslator.java       |   7 +-
 .../mapreduce/translation/JobPrototype.java     |  93 +++++++----
 .../mapreduce/translation/Operation.java        |  11 +-
 .../mapreduce/translation/OutputReceiver.java   |   9 +-
 .../mapreduce/translation/ParDoOperation.java   |  43 +++--
 .../mapreduce/translation/ParDoTranslator.java  |   8 +-
 .../translation/ReadBoundedTranslator.java      |  11 +-
 .../mapreduce/translation/ReadOperation.java    |  45 -----
 .../ReifyTimestampAndWindowsParDoOperation.java |   5 +-
 .../translation/ShuffleWriteOperation.java      |  62 +++++++
 .../mapreduce/translation/SourceOperation.java  |  45 +++++
 .../translation/TranslationContext.java         |   4 +-
 .../translation/TranslatorRegistry.java         |  11 +-
 .../mapreduce/translation/ViewTranslator.java   |   8 +-
 .../translation/WindowAssignTranslator.java     |   7 +-
 .../mapreduce/translation/WriteOperation.java   |  66 --------
 .../mapreduce/translation/GraphPlannerTest.java |   3 +-
 30 files changed, 801 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index c37da58..9224eb6 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -43,6 +43,10 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   Class<?> getJarClass();
   void setJarClass(Class<?> jarClass);
 
+  @Description("The jar class of the user Beam program.")
+  String getTmpDir();
+  void setTmpDir(String tmpDir);
+
   class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
     @Override
     public Class<?> create(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index c5626a4..a7e75bb 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -66,19 +66,30 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
 
     LOG.info(graphConverter.getDotfile());
 
+    Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
+    LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
     GraphPlanner planner = new GraphPlanner();
-    Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+    fusedGraph = planner.plan(fusedGraph);
 
     LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
+    Configuration config = new Configuration();
+    config.set("keep.failed.task.files", "true");
+
+    fusedGraph.getFusedSteps();
+
     int stageId = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
       JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, 
options);
+      LOG.info("Running job-{}.", stageId);
+      LOG.info(DotfileWriter.toDotfile(fusedStep));
       try {
-        Job job = jobPrototype.build(options.getJarClass(), new 
Configuration());
+        Job job = jobPrototype.build(options.getJarClass(), config);
         job.waitForCompletion(true);
       } catch (Exception e) {
         Throwables.throwIfUnchecked(e);
+        throw new RuntimeException(e);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 8a27a85..03a88aa 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -21,14 +21,17 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.commons.codec.binary.Base64;
@@ -45,10 +48,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 public class BeamInputFormat<T> extends InputFormat {
 
   public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = 
"beam-serialized-bounded-source";
+  public static final String BEAM_SERIALIZED_PIPELINE_OPTIONS = 
"beam-serialized-pipeline-options";
+
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 
* 1000;
 
-  private BoundedSource<T> source;
-  private PipelineOptions options;
+  private List<BoundedSource<T>> sources;
+  private SerializedPipelineOptions options;
 
   public BeamInputFormat() {
   }
@@ -56,21 +61,36 @@ public class BeamInputFormat<T> extends InputFormat {
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
     String serializedBoundedSource = 
context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE);
-    if (Strings.isNullOrEmpty(serializedBoundedSource)) {
+    String serializedPipelineOptions =
+        context.getConfiguration().get(BEAM_SERIALIZED_PIPELINE_OPTIONS);
+    if (Strings.isNullOrEmpty(serializedBoundedSource)
+        || Strings.isNullOrEmpty(serializedPipelineOptions)) {
       return ImmutableList.of();
     }
-    source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray(
+    sources = (List<BoundedSource<T>>) 
SerializableUtils.deserializeFromByteArray(
         Base64.decodeBase64(serializedBoundedSource), "BoundedSource");
+    options = ((SerializedPipelineOptions) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedPipelineOptions), 
"SerializedPipelineOptions"));
+
     try {
-      return 
FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options))
-          .transform(new Function<BoundedSource<T>, InputSplit>() {
+
+      return FluentIterable.from(sources)
+          .transformAndConcat(new Function<BoundedSource<T>, 
Iterable<BoundedSource<T>>>() {
             @Override
-            public InputSplit apply(BoundedSource<T> source) {
+            public Iterable<BoundedSource<T>> apply(BoundedSource<T> input) {
               try {
-                return new 
BeamInputSplit(source.getEstimatedSizeBytes(options));
+                return (Iterable<BoundedSource<T>>) input.split(
+                    DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options.getPipelineOptions());
               } catch (Exception e) {
+                Throwables.throwIfUnchecked(e);
                 throw new RuntimeException(e);
               }
+            }
+          })
+          .transform(new Function<BoundedSource<T>, InputSplit>() {
+            @Override
+            public InputSplit apply(BoundedSource<T> source) {
+              return new BeamInputSplit(source, options);
             }})
           .toList();
     } catch (Exception e) {
@@ -81,26 +101,35 @@ public class BeamInputFormat<T> extends InputFormat {
   @Override
   public RecordReader createRecordReader(
       InputSplit split, TaskAttemptContext context) throws IOException, 
InterruptedException {
-    // TODO: it should initiates from InputSplit.
-    source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray(
-        
Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)),
-        "");
-    return new BeamRecordReader<>(source.createReader(options));
+    return ((BeamInputSplit) split).createReader();
   }
 
-  public static class BeamInputSplit extends InputSplit implements Writable {
-    private long estimatedSizeBytes;
+  public static class BeamInputSplit<T> extends InputSplit implements Writable 
{
+    private BoundedSource<T> boundedSource;
+    private SerializedPipelineOptions options;
 
     public BeamInputSplit() {
     }
 
-    BeamInputSplit(long estimatedSizeBytes) {
-      this.estimatedSizeBytes = estimatedSizeBytes;
+    public BeamInputSplit(BoundedSource<T> boundedSource, 
SerializedPipelineOptions options) {
+      this.boundedSource = checkNotNull(boundedSource, "boundedSources");
+      this.options = checkNotNull(options, "options");
+    }
+
+    public BeamRecordReader<T> createReader() throws IOException {
+      return new 
BeamRecordReader<>(boundedSource.createReader(options.getPipelineOptions()));
     }
 
     @Override
     public long getLength() throws IOException, InterruptedException {
-      return estimatedSizeBytes;
+      try {
+        return 
boundedSource.getEstimatedSizeBytes(options.getPipelineOptions());
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
+        Throwables.throwIfInstanceOf(e, IOException.class);
+        Throwables.throwIfInstanceOf(e, InterruptedException.class);
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -110,16 +139,28 @@ public class BeamInputFormat<T> extends InputFormat {
 
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeLong(estimatedSizeBytes);
+      ByteArrayOutputStream stream = new ByteArrayOutputStream();
+      SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
+      SerializableCoder.of(SerializedPipelineOptions.class).encode(options, 
stream);
+
+      byte[] bytes = stream.toByteArray();
+      out.writeInt(bytes.length);
+      out.write(bytes);
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      estimatedSizeBytes = in.readLong();
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+
+      ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
+      boundedSource = 
SerializableCoder.of(BoundedSource.class).decode(inStream);
+      options = 
SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
     }
   }
 
-  private class BeamRecordReader<T> extends RecordReader {
+  private static class BeamRecordReader<T> extends RecordReader {
 
     private final BoundedSource.BoundedReader<T> reader;
     private boolean started;

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index bc52967..d3ebb5c 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -19,17 +19,21 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Adapter for executing Beam transforms in {@link Mapper}.
  */
 public class BeamMapper<ValueInT, ValueOutT>
     extends Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(Mapper.class);
 
   public static final String BEAM_PAR_DO_OPERATION_MAPPER = 
"beam-par-do-op-mapper";
 
@@ -50,7 +54,9 @@ public class BeamMapper<ValueInT, ValueOutT>
   protected void map(
       Object key,
       WindowedValue<ValueInT> value,
-      Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
+      Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context)
+      throws IOException, InterruptedException {
+    LOG.info("key: {} value: {}.", key, value);
     operation.process(value);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
index 3490b3b..a382904 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -33,8 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -42,6 +45,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  */
 public class BeamReducer<ValueInT, ValueOutT>
     extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
 
   public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder";
   public static final String BEAM_PAR_DO_OPERATION_REDUCER = 
"beam-par-do-op-reducer";
@@ -72,7 +76,8 @@ public class BeamReducer<ValueInT, ValueOutT>
   protected void reduce(
       BytesWritable key,
       Iterable<byte[]> values,
-      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context 
context)
+      throws InterruptedException, IOException {
     List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values)
         .transform(new Function<byte[], Object>() {
           @Override
@@ -85,15 +90,10 @@ public class BeamReducer<ValueInT, ValueOutT>
               throw new RuntimeException(e);
             }
           }}));
-
-    try {
-      operation.process(
-          WindowedValue.valueInGlobalWindow(
-              KV.of(keyCoder.decode(new ByteArrayInputStream(key.getBytes())), 
decodedValues)));
-    } catch (IOException e) {
-      Throwables.throwIfUnchecked(e);
-      throw new RuntimeException(e);
-    }
+    Object decodedKey = keyCoder.decode(new 
ByteArrayInputStream(key.getBytes()));
+    LOG.info("key: {} value: {}.", decodedKey, decodedValues);
+    operation.process(
+        WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
index 5b0fcd8..863c4c9 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
@@ -17,36 +17,75 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Class that outputs {@link Graph} to dot file.
  */
 public class DotfileWriter {
 
-  public static <StepT extends Graph.AbstractStep<TagT>, TagT extends 
Graph.AbstractTag>
+  public static <StepT extends Graph.AbstractStep, TagT extends 
Graph.AbstractTag>
   String toDotfile(Graphs.FusedGraph fusedGraph) {
     StringBuilder sb = new StringBuilder();
     sb.append("\ndigraph G {\n");
 
+    Map<Graphs.FusedStep, String> fusedStepToId = Maps.newHashMap();
     int i = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
-      sb.append(String.format("  subgraph \"cluster_%d\" {\n", i++));
+      String clusterId = String.format("cluster_%d", i++);
+      sb.append(String.format("  subgraph \"%s\" {\n", clusterId));
+      sb.append(String.format("    \"%s\" [shape=point style=invis];\n", 
clusterId));
+      fusedStepToId.put(fusedStep, clusterId);
+
+      Set<String> nodeDefines = Sets.newHashSet();
       for (Graphs.Step step : fusedStep.getSteps()) {
-        sb.append(String.format("    \"%s\" [shape=box];\n", 
step.getFullName()));
-        for (Graph.AbstractTag outTag : step.getOutputTags()) {
-          sb.append(String.format("    \"%s\" [shape=ellipse];\n", outTag));
+        nodeDefines.add(String.format("    \"%s\" [shape=box];\n", 
step.getFullName()));
+        for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];\n", 
inTag));
         }
+        for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];\n", 
outTag));
+        }
+      }
+      for (String str : nodeDefines) {
+        sb.append(str);
       }
       sb.append(String.format("  }"));
     }
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      // Edges within fused steps.
       for (Graphs.Step step : fusedStep.getSteps()) {
-        for (Graph.AbstractTag inTag : step.getInputTags()) {
+        for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
           sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, step));
         }
-        for (Graph.AbstractTag outTag : step.getOutputTags()) {
+        for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
           sb.append(String.format("  \"%s\" -> \"%s\";\n", step, outTag));
         }
       }
+
+      // Edges between sub-graphs.
+      for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, 
fusedStepToId.get(fusedStep)));
+      }
+    }
+    sb.append("}\n");
+    return sb.toString();
+  }
+
+  public static String toDotfile(Graphs.FusedStep fusedStep) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\ndigraph G {\n");
+    for (Graphs.Step step : fusedStep.getSteps()) {
+      sb.append(String.format("  \"%s\" [shape=box];\n", step.getFullName()));
+      for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, step));
+      }
+      for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
+        sb.append(String.format("  \"%s\" -> \"%s\";\n", step, outTag));
+      }
     }
     sb.append("}\n");
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
new file mode 100644
index 0000000..674e30a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Operation that reads from files.
+ */
+public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> {
+
+  public FileReadOperation(int producerStageId, String fileName, Coder<T> 
coder) {
+    super(new FileBoundedSource<>(producerStageId, fileName, coder));
+  }
+
+  private static class FileBoundedSource<T> extends 
BoundedSource<WindowedValue<T>> {
+
+    private final int producerStageId;
+    private final String fileName;
+    private final Coder<WindowedValue<T>> coder;
+
+    FileBoundedSource(int producerStageId, String fileName, Coder<T> coder) {
+      this.producerStageId = producerStageId;
+      this.fileName = checkNotNull(fileName, "fileName");
+      checkNotNull(coder, "coder");
+      this.coder = WindowedValue.getFullCoder(
+          coder, 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+
+    }
+
+    @Override
+    public List<? extends BoundedSource<WindowedValue<T>>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
+      // TODO: support split.
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<WindowedValue<T>> createReader(PipelineOptions 
options)
+        throws IOException {
+      Path pattern = new Path(String.format("/tmp/mapreduce/stage-2/%s*", 
fileName));
+      // TODO: use config from the job.
+      Configuration conf = new Configuration();
+      conf.set(
+          "io.serializations",
+          "org.apache.hadoop.io.serializer.WritableSerialization,"
+              + "org.apache.hadoop.io.serializer.JavaSerialization");
+      FileSystem fs = pattern.getFileSystem(conf);
+      FileStatus[] files = fs.globStatus(pattern);
+      Queue<SequenceFile.Reader> readers = new LinkedList<>();
+      for (FileStatus f : files) {
+        readers.add(new SequenceFile.Reader(fs, files[0].getPath(), conf));
+      }
+      return new Reader<>(this, readers, coder);
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<WindowedValue<T>> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    private static class Reader<T> extends BoundedReader<WindowedValue<T>> {
+
+      private final BoundedSource<WindowedValue<T>> boundedSource;
+      private final Queue<SequenceFile.Reader> readers;
+      private final Coder<WindowedValue<T>> coder;
+      private final BytesWritable value = new BytesWritable();
+
+      Reader(
+          BoundedSource<WindowedValue<T>> boundedSource,
+          Queue<SequenceFile.Reader> readers,
+          Coder<WindowedValue<T>> coder) {
+        this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+        this.readers = checkNotNull(readers, "readers");
+        this.coder = checkNotNull(coder, "coder");
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        SequenceFile.Reader reader = readers.peek();
+        if (reader == null) {
+          return false;
+        }
+        boolean hasNext = reader.next(NullWritable.get(), value);
+        if (hasNext) {
+          return true;
+        } else {
+          reader.close();
+          readers.remove(reader);
+          return advance();
+        }
+      }
+
+      @Override
+      public WindowedValue<T> getCurrent() throws NoSuchElementException {
+        ByteArrayInputStream inStream = new 
ByteArrayInputStream(value.getBytes());
+        try {
+          return coder.decode(inStream);
+        } catch (IOException e) {
+          Throwables.throwIfUnchecked(e);
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public BoundedSource<WindowedValue<T>> getCurrentSource() {
+        return boundedSource;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
new file mode 100644
index 0000000..468856a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+
+/**
+ * Operation that writes to files.
+ */
+public class FileWriteOperation<T> extends Operation<T> {
+
+  private final String fileName;
+  private final Coder<WindowedValue<T>> coder;
+  private transient MultipleOutputs mos;
+
+  public FileWriteOperation(String fileName, Coder<T> coder) {
+    super(0);
+    this.fileName = checkNotNull(fileName, "fileName");
+    checkNotNull(coder, "coder");
+    // TODO: should not hard-code windows coder.
+    this.coder = WindowedValue.getFullCoder(
+        coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    this.mos = new MultipleOutputs(taskContext);
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) throws IOException, 
InterruptedException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(elem, stream);
+
+    mos.write(fileName, NullWritable.get(), new 
BytesWritable(stream.toByteArray()));
+  }
+
+  @Override
+  public void finish() {
+    try {
+      mos.close();
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index 8860caf..b966f2a 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -28,10 +28,9 @@ public class FlattenTranslator<T> extends 
TransformTranslator.Default<Flatten.PC
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
     Operation<?> operation = new FlattenOperation();
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        operation,
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index b6900cc..66e573f 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -21,19 +21,23 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.common.graph.ElementOrder;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Objects;
 import java.util.Set;
 
 /**
  * Graph that represents a Beam DAG.
  */
-public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends 
Graph.AbstractTag> {
+public class Graph<StepT extends Graph.AbstractStep, TagT extends 
Graph.AbstractTag> {
 
-  private final MutableGraph<Vertex> graph;
+  public final MutableGraph<Vertex> graph;
 
   public Graph() {
     this.graph = GraphBuilder.directed()
@@ -45,16 +49,16 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, 
TagT extends Graph.Ab
   /**
    * Adds {@link StepT} to this {@link Graph}.
    */
-  public void addStep(StepT step) {
+  public void addStep(StepT step, List<TagT> inTags, List<TagT> outTags) {
     graph.addNode(step);
     Set<Vertex> nodes = graph.nodes();
-    for (TagT tag : step.getInputTags()) {
+    for (TagT tag : inTags) {
       if (!nodes.contains(tag)) {
         graph.addNode(tag);
       }
       graph.putEdge(tag, step);
     }
-    for (TagT tag : step.getOutputTags()) {
+    for (TagT tag : outTags) {
       if (!nodes.contains(tag)) {
         graph.addNode(tag);
       }
@@ -93,7 +97,18 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, 
TagT extends Graph.Ab
           public boolean apply(Vertex input) {
             return input instanceof AbstractStep;
           }}))
-        .toList();
+        .toSortedList(new Comparator<StepT>() {
+          @Override
+          public int compare(StepT left, StepT right) {
+            if (left.equals(right)) {
+              return 0;
+            } else if (com.google.common.graph.Graphs.reachableNodes(graph, 
left).contains(right)) {
+              return -1;
+            } else {
+              return 1;
+            }
+          }
+        });
   }
 
   public List<StepT> getStartSteps() {
@@ -106,32 +121,40 @@ public class Graph<StepT extends 
Graph.AbstractStep<TagT>, TagT extends Graph.Ab
         .toList();
   }
 
-  public List<TagT> getInputTags() {
-    return castToTagList(FluentIterable.from(graph.nodes())
-        .filter(new Predicate<Vertex>() {
-          @Override
-          public boolean apply(Vertex input) {
-            return input instanceof AbstractTag && graph.inDegree(input) == 0;
-          }}))
-        .toList();
+  public StepT getProducer(TagT tag) {
+    if (contains(tag)) {
+      return (StepT) Iterables.getOnlyElement(graph.predecessors(tag));
+    } else {
+      return null;
+    }
   }
 
-  public List<TagT> getOutputTags() {
-    return castToTagList(FluentIterable.from(graph.nodes())
-        .filter(new Predicate<Vertex>() {
-          @Override
-          public boolean apply(Vertex input) {
-            return input instanceof AbstractTag && graph.outDegree(input) == 0;
-          }}))
-        .toList();
+  public List<StepT> getConsumers(TagT tag) {
+    if (contains(tag)) {
+      return castToStepList(graph.successors(tag)).toList();
+    } else {
+      return Collections.emptyList();
+    }
   }
 
-  public StepT getProducer(TagT tag) {
-    return (StepT) Iterables.getOnlyElement(graph.predecessors(tag));
+  public List<TagT> getInputTags(StepT step) {
+    if (contains(step)) {
+      return castToTagList(graph.predecessors(step)).toList();
+    } else {
+      return Collections.emptyList();
+    }
   }
 
-  public List<StepT> getConsumers(TagT tag) {
-    return castToStepList(graph.successors(tag)).toList();
+  public List<TagT> getOutputTags(StepT step) {
+    if (contains(step)) {
+      return castToTagList(graph.successors(step)).toList();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  private boolean contains(Vertex node) {
+    return graph.nodes().contains(node);
   }
 
   private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) {
@@ -175,9 +198,7 @@ public class Graph<StepT extends Graph.AbstractStep<TagT>, 
TagT extends Graph.Ab
   interface Vertex {
   }
 
-  public abstract static class AbstractStep<TagT extends AbstractTag> 
implements Vertex {
-    public abstract List<TagT> getInputTags();
-    public abstract List<TagT> getOutputTags();
+  public abstract static class AbstractStep implements Vertex {
   }
 
   public abstract static class AbstractTag implements Vertex {

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index be694e4..13d215f 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -17,8 +17,9 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Class that optimizes the initial graph to a fused graph.
@@ -29,31 +30,47 @@ public class GraphPlanner {
   public GraphPlanner() {
   }
 
-  public Graphs.FusedGraph plan(Graph<Graphs.Step, Graphs.Tag> initGraph) {
-    Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph();
-    // Convert from the list of steps to Graphs.
-    for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) {
-      Graphs.FusedStep fusedStep = new Graphs.FusedStep();
-      fusedStep.addStep(step);
-      fusedGraph.addFusedStep(fusedStep);
+  public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) {
+    // Attach writes/reads on fusion boundaries.
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      for (Graphs.Tag tag : fusedGraph.getOutputTags(fusedStep)) {
+        List<Graphs.FusedStep> consumers = fusedGraph.getConsumers(tag);
+        if (consumers.isEmpty()) {
+          continue;
+        }
+        Graphs.Step producer = fusedStep.getProducer(tag);
+        if (producer.getOperation() instanceof ViewOperation) {
+          continue;
+        }
+        String tagName = tag.getName();
+        String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0");
+        fusedStep.addStep(
+            Graphs.Step.of(
+                tagName + "/Write",
+                new FileWriteOperation(fileName, tag.getCoder())),
+            ImmutableList.of(tag),
+            ImmutableList.<Graphs.Tag>of());
 
-      tryFuse(fusedGraph, fusedStep);
+        String readStepName = tagName + "/Read";
+        Graphs.Tag readOutput = Graphs.Tag.of(
+            readStepName + ".out", new TupleTag<>(), tag.getCoder());
+        for (Graphs.FusedStep consumer : consumers) {
+          // Re-direct tag to readOutput.
+          List<Graphs.Step> receivers = consumer.getConsumers(tag);
+          for (Graphs.Step step : receivers) {
+            consumer.addEdge(readOutput, step);
+          }
+          consumer.removeTag(tag);
+          consumer.addStep(
+              Graphs.Step.of(
+                  readStepName,
+                  new FileReadOperation(fusedStep.getStageId(), fileName, 
tag.getCoder())),
+              ImmutableList.<Graphs.Tag>of(),
+              ImmutableList.of(readOutput));
+        }
+      }
     }
-    return fusedGraph;
-  }
 
-  private void tryFuse(Graphs.FusedGraph fusedGraph, Graphs.FusedStep 
fusedStep) {
-    if (fusedStep.getOutputTags().size() != 1) {
-      return;
-    }
-    Graphs.Tag outTag = Iterables.getOnlyElement(fusedStep.getOutputTags());
-    if (fusedGraph.getConsumers(outTag).size() != 1) {
-      return;
-    }
-    Graphs.FusedStep consumer = 
Iterables.getOnlyElement(fusedGraph.getConsumers(outTag));
-    if (fusedStep.containsGroupByKey() && consumer.containsGroupByKey()) {
-      return;
-    }
-    fusedGraph.merge(fusedStep, consumer);
+    return fusedGraph;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index cef5afc..97b5441 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -32,26 +34,68 @@ public class Graphs {
 
   public static class FusedGraph {
     private final Graph<FusedStep, Tag> graph;
+    private int stageId = 0;
 
     public FusedGraph() {
       this.graph = new Graph<>();
     }
 
-    public void addFusedStep(FusedStep fusedStep) {
-      graph.addStep(fusedStep);
+    public FusedGraph(Graph<Graphs.Step, Tag> initGraph) {
+      this.graph = new Graph<>();
+
+      // Convert from the list of steps to Graphs.
+      for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) {
+        tryFuse(step, initGraph.getInputTags(step), 
initGraph.getOutputTags(step));
+      }
+      // Remove unused external tags.
+      for (FusedStep fusedStep : graph.getSteps()) {
+        for (Tag outTag : graph.getOutputTags(fusedStep)) {
+          if (graph.getConsumers(outTag).isEmpty()) {
+            graph.removeTag(outTag);
+          }
+        }
+      }
     }
 
-    public void merge(FusedStep src, FusedStep dest) {
-      for (Step step : src.steps.getSteps()) {
-        dest.addStep(step);
+    public void tryFuse(
+        Graphs.Step step,
+        List<Graphs.Tag> inTags,
+        List<Graphs.Tag> outTags) {
+      if (canFuse(step, inTags, outTags)) {
+        Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+        Graphs.FusedStep consumer = 
Iterables.getOnlyElement(graph.getConsumers(outTag));
+        consumer.addStep(step, inTags, outTags);
+        for (Graphs.Tag in : inTags) {
+          graph.addEdge(in, consumer);
+        }
+        graph.removeTag(outTag);
+        graph.addEdge(consumer, outTag);
+      } else {
+        Graphs.FusedStep newFusedStep = new Graphs.FusedStep(stageId++);
+        newFusedStep.addStep(step, inTags, outTags);
+        graph.addStep(newFusedStep, inTags, outTags);
+      }
+    }
+
+    private boolean canFuse(
+        Graphs.Step step,
+        List<Graphs.Tag> inTags,
+        List<Graphs.Tag> outTags) {
+      if (step.getOperation() instanceof ViewOperation) {
+        return false;
+      }
+      if (outTags.size() != 1) {
+        return false;
       }
-      for (Tag inTag : src.getInputTags()) {
-        graph.addEdge(inTag, dest);
+      Graphs.Tag outTag = Iterables.getOnlyElement(outTags);
+      if (graph.getConsumers(outTag).size() != 1) {
+        return false;
       }
-      for (Tag outTag : src.getOutputTags()) {
-        graph.addEdge(dest, outTag);
+      Graphs.FusedStep consumer = 
Iterables.getOnlyElement(graph.getConsumers(outTag));
+      if (consumer.containsGroupByKey() && step.getOperation() instanceof 
GroupByKeyOperation) {
+        return false;
       }
-      graph.removeStep(src);
+      return true;
     }
 
     public FusedStep getProducer(Tag tag) {
@@ -65,29 +109,41 @@ public class Graphs {
     public List<FusedStep> getFusedSteps() {
       return graph.getSteps();
     }
+
+    public List<Tag> getInputTags(FusedStep fusedStep) {
+      return graph.getInputTags(fusedStep);
+    }
+
+    public List<Tag> getOutputTags(FusedStep fusedStep) {
+      return graph.getOutputTags(fusedStep);
+    }
   }
 
-  public static class FusedStep extends Graph.AbstractStep<Tag> {
+  public static class FusedStep extends Graph.AbstractStep {
+    private final int stageId;
     private final Graph<Step, Tag> steps;
     private Step groupByKeyStep;
 
-    public FusedStep() {
+    public FusedStep(int stageid) {
+      this.stageId = stageid;
       this.steps = new Graph<>();
       this.groupByKeyStep = null;
     }
 
-    @Override
-    public List<Tag> getInputTags() {
-      return steps.getInputTags();
+    public int getStageId() {
+      return stageId;
     }
 
-    @Override
-    public List<Tag> getOutputTags() {
-      return steps.getOutputTags();
+    public List<Tag> getInputTags(Step step) {
+      return steps.getInputTags(step);
+    }
+
+    public List<Tag> getOutputTags(Step step) {
+      return steps.getOutputTags(step);
     }
 
-    public void addStep(Step step) {
-      steps.addStep(step);
+    public void addStep(Step step, List<Tag> inTags, List<Tag> outTags) {
+      steps.addStep(step, inTags, outTags);
       if (step.getOperation() instanceof GroupByKeyOperation) {
         groupByKeyStep = step;
       }
@@ -156,18 +212,14 @@ public class Graphs {
   }
 
   @AutoValue
-  public abstract static class Step extends Graph.AbstractStep<Tag> {
+  public abstract static class Step extends Graph.AbstractStep {
     abstract String getFullName();
     // TODO: remove public
     public abstract Operation getOperation();
 
-    public static Step of(
-        String fullName,
-        Operation operation,
-        List<Tag> inputTags,
-        List<Tag> outputTags) {
+    public static Step of(String fullName, Operation operation) {
       return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step(
-          inputTags, outputTags, fullName, operation);
+          fullName, operation);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
index 66cf3b6..1ae38da 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -38,8 +38,9 @@ public class GroupAlsoByWindowsParDoOperation extends 
ParDoOperation {
   public GroupAlsoByWindowsParDoOperation(
       PipelineOptions options,
       WindowingStrategy<?, ?> windowingStrategy,
-      Coder<?> inputCoder) {
-    super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), 
windowingStrategy);
+      Coder<?> inputCoder,
+      Graphs.Tag outTag) {
+    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), 
windowingStrategy);
     this.inputCoder = checkNotNull(inputCoder, "inputCoder");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
index e87ed09..4c627d7 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
@@ -37,10 +37,9 @@ class GroupByKeyTranslator<K, V> extends 
TransformTranslator.Default<GroupByKey<
 
     GroupByKeyOperation<K, V> groupByKeyOperation =
         new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder);
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        groupByKeyOperation,
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), groupByKeyOperation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 24feebd..1016e22 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -20,13 +20,14 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -38,8 +39,12 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 /**
  * Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
@@ -53,13 +58,11 @@ public class JobPrototype {
 
   private final int stageId;
   private final Graphs.FusedStep fusedStep;
-  private final Set<JobPrototype> dependencies;
   private final PipelineOptions options;
 
   private JobPrototype(int stageId, Graphs.FusedStep fusedStep, 
PipelineOptions options) {
     this.stageId = stageId;
     this.fusedStep = checkNotNull(fusedStep, "fusedStep");
-    this.dependencies = Sets.newHashSet();
     this.options = checkNotNull(options, "options");
   }
 
@@ -72,19 +75,38 @@ public class JobPrototype {
         "org.apache.hadoop.io.serializer.WritableSerialization,"
             + "org.apache.hadoop.io.serializer.JavaSerialization");
 
+    //TODO: config out dir with PipelineOptions.
+    conf.set(
+        FileOutputFormat.OUTDIR,
+        String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId()));
+
     // Setup BoundedSources in BeamInputFormat.
     // TODO: support more than one read steps by introducing a composed 
BeamInputFormat
     // and a partition operation.
-    Graphs.Step readStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
-    checkState(readStep.getOperation() instanceof ReadOperation);
-    BoundedSource source = ((ReadOperation) 
readStep.getOperation()).getSource();
+    List<Graphs.Step> readSteps = fusedStep.getStartSteps();
+    ArrayList<BoundedSource<?>> sources = new ArrayList<>();
+    sources.addAll(
+        FluentIterable.from(readSteps)
+            .transform(new Function<Graphs.Step, BoundedSource<?>>() {
+              @Override
+              public BoundedSource<?> apply(Graphs.Step step) {
+                checkState(step.getOperation() instanceof SourceOperation);
+                return ((SourceOperation) step.getOperation()).getSource();
+              }})
+            .toList());
+
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
-        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
+        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(sources)));
+    conf.set(
+        BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+            new SerializedPipelineOptions(options))));
     job.setInputFormatClass(BeamInputFormat.class);
 
     if (fusedStep.containsGroupByKey()) {
       Graphs.Step groupByKey = fusedStep.getGroupByKeyStep();
+      Graphs.Tag gbkOutTag = 
Iterables.getOnlyElement(fusedStep.getOutputTags(groupByKey));
       GroupByKeyOperation operation = (GroupByKeyOperation) 
groupByKey.getOperation();
       WindowingStrategy<?, ?> windowingStrategy = 
operation.getWindowingStrategy();
       KvCoder<?, ?> kvCoder = operation.getKvCoder();
@@ -92,28 +114,26 @@ public class JobPrototype {
       String reifyStepName = groupByKey.getFullName() + "-Reify";
       Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), 
windowingStrategy);
       Graphs.Tag reifyOutputTag = Graphs.Tag.of(
-          reifyStepName + ".out", new TupleTag<Object>(), reifyValueCoder);
+          reifyStepName + ".out", new TupleTag<>(), reifyValueCoder);
       Graphs.Step reifyStep = Graphs.Step.of(
           reifyStepName,
-          new ReifyTimestampAndWindowsParDoOperation(options, 
operation.getWindowingStrategy()),
-          groupByKey.getInputTags(),
-          ImmutableList.of(reifyOutputTag));
+          new ReifyTimestampAndWindowsParDoOperation(
+              options, operation.getWindowingStrategy(), reifyOutputTag));
 
       Graphs.Step writeStep = Graphs.Step.of(
           groupByKey.getFullName() + "-Write",
-          new WriteOperation(kvCoder.getKeyCoder(), reifyValueCoder),
-          ImmutableList.of(reifyOutputTag),
-          Collections.<Graphs.Tag>emptyList());
+          new ShuffleWriteOperation(kvCoder.getKeyCoder(), reifyValueCoder));
 
       Graphs.Step gabwStep = Graphs.Step.of(
           groupByKey.getFullName() + "-GroupAlsoByWindows",
-          new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, 
kvCoder),
-          Collections.<Graphs.Tag>emptyList(),
-          groupByKey.getOutputTags());
-
-      fusedStep.addStep(reifyStep);
-      fusedStep.addStep(writeStep);
-      fusedStep.addStep(gabwStep);
+          new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, 
kvCoder, gbkOutTag));
+
+      fusedStep.addStep(
+          reifyStep, fusedStep.getInputTags(groupByKey), 
ImmutableList.of(reifyOutputTag));
+      fusedStep.addStep(
+          writeStep, ImmutableList.of(reifyOutputTag), 
Collections.<Graphs.Tag>emptyList());
+      fusedStep.addStep(
+          gabwStep, Collections.<Graphs.Tag>emptyList(), 
ImmutableList.of(gbkOutTag));
       fusedStep.removeStep(groupByKey);
 
       // Setup BeamReducer
@@ -129,8 +149,9 @@ public class JobPrototype {
               
SerializableUtils.serializeToByteArray(reducerStartStep.getOperation())));
       job.setReducerClass(BeamReducer.class);
     }
+
     // Setup DoFns in BeamMapper.
-    Graphs.Tag readOutputTag = 
Iterables.getOnlyElement(readStep.getOutputTags());
+    Graphs.Tag readOutputTag = 
Iterables.getOnlyElement(fusedStep.getOutputTags(readSteps.get(0)));
     Graphs.Step mapperStartStep = 
Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag));
     chainOperations(mapperStartStep, fusedStep);
 
@@ -141,18 +162,28 @@ public class JobPrototype {
         Base64.encodeBase64String(
             
SerializableUtils.serializeToByteArray(mapperStartStep.getOperation())));
     job.setMapperClass(BeamMapper.class);
-
-    job.setOutputFormatClass(NullOutputFormat.class);
-
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    for (Graphs.Step step : fusedStep.getSteps()) {
+      if (step.getOperation() instanceof FileWriteOperation) {
+        FileWriteOperation writeOperation = (FileWriteOperation) 
step.getOperation();
+        //SequenceFileOutputFormat.setOutputPath(job, new 
Path("/tmp/mapreduce/"));
+        MultipleOutputs.addNamedOutput(
+            job,
+            writeOperation.getFileName(),
+            SequenceFileOutputFormat.class,
+            NullWritable.class, BytesWritable.class);
+      }
+    }
     return job;
   }
 
   private void chainOperations(Graphs.Step current, Graphs.FusedStep 
fusedStep) {
     Operation<?> operation = current.getOperation();
-    List<Graphs.Tag> outputTags = current.getOutputTags();
-    for (int index = 0; index < outputTags.size(); ++index) {
-      for (Graphs.Step consumer : 
fusedStep.getConsumers(outputTags.get(index))) {
-        operation.attachConsumer(index, consumer.getOperation());
+    List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current);
+    for (Graphs.Tag outTag : outputTags) {
+      for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+        operation.attachConsumer(outTag.getTupleTag(), 
consumer.getOperation());
       }
     }
     for (Graphs.Tag outTag : outputTags) {

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index 187ea79..574f152 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -18,9 +18,11 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
@@ -55,7 +57,7 @@ public abstract class Operation<T> implements Serializable {
   /**
    * Processes the element.
    */
-  public abstract void process(WindowedValue<T> elem);
+  public abstract void process(WindowedValue<T> elem) throws IOException, 
InterruptedException;
 
   /**
    * Finishes this Operation's execution.
@@ -80,8 +82,13 @@ public abstract class Operation<T> implements Serializable {
   /**
    * Adds an output to this Operation.
    */
-  public void attachConsumer(int outputIndex, Operation consumer) {
+  public void attachConsumer(TupleTag<?> tupleTag, Operation consumer) {
+    int outputIndex = getOutputIndex(tupleTag);
     OutputReceiver fanOut = receivers[outputIndex];
     fanOut.addOutput(consumer);
   }
+
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
index 3dab890..b2f1b6d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,7 +48,12 @@ public class OutputReceiver implements Serializable {
   public void process(WindowedValue<?> elem) {
     for (Operation out : receivingOperations) {
       if (out != null) {
-        out.process(elem);
+        try {
+          out.process(elem);
+        } catch (IOException | InterruptedException e) {
+          Throwables.throwIfUnchecked(e);
+          throw new RuntimeException(e);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index a76773f..c6bf49c 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import java.util.List;
 import javax.annotation.Nullable;
@@ -26,24 +27,23 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Operation for ParDo.
  */
 public abstract class ParDoOperation<InputT, OutputT> extends 
Operation<InputT> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoOperation.class);
-
   protected final SerializedPipelineOptions options;
   protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
+  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
   private DoFnRunner<InputT, OutputT> fnRunner;
 
   public ParDoOperation(
@@ -65,6 +65,12 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
 
   @Override
   public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    super.start(taskContext);
+    DoFn<InputT, OutputT> doFn = getDoFn();
+    // Process user's setup
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
     fnRunner = DoFnRunners.simpleRunner(
         options.getPipelineOptions(),
         getDoFn(),
@@ -75,7 +81,6 @@ public abstract class ParDoOperation<InputT, OutputT> extends 
Operation<InputT>
         null,
         windowingStrategy);
     fnRunner.startBundle();
-    super.start(taskContext);
   }
 
   /**
@@ -83,14 +88,27 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
    */
   @Override
   public void process(WindowedValue<InputT> elem) {
-    LOG.info("elem: {}.", elem);
     fnRunner.processElement(elem);
   }
 
   @Override
   public void finish() {
-    super.finish();
     fnRunner.finishBundle();
+    doFnInvoker.invokeTeardown();
+    super.finish();
+  }
+
+  @Override
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    if (tupleTag == mainOutputTag) {
+      return 0;
+    } else {
+      int sideIndex = sideOutputTags.indexOf(tupleTag);
+      checkState(
+          sideIndex >= 0,
+          String.format("Cannot find index for tuple tag: %s.", tupleTag));
+      return sideIndex + 1;
+    }
   }
 
   protected DoFnRunners.OutputManager createOutputManager() {
@@ -100,15 +118,10 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
   private class ParDoOutputManager implements DoFnRunners.OutputManager {
 
     @Nullable
-    private OutputReceiver getReceiverOrNull(TupleTag<?> tag) {
+    private OutputReceiver getReceiverOrNull(TupleTag<?> tupleTag) {
       List<OutputReceiver> receivers = getOutputReceivers();
-      if (tag.equals(mainOutputTag)) {
-        return receivers.get(0);
-      } else if (sideOutputTags.contains(tag)) {
-        return receivers.get(sideOutputTags.indexOf(tag) + 1);
-      } else {
-        return null;
-      }
+      int outputIndex = getOutputIndex(tupleTag);
+      return receivers.get(outputIndex);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
index 1a1373a..9bd89fd 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -36,11 +36,9 @@ class ParDoTranslator<InputT, OutputT>
         transform.getMainOutputTag(),
         transform.getAdditionalOutputTags().getAll(),
         ((PCollection) userGraphContext.getInput()).getWindowingStrategy());
-
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        operation,
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
index 0710827..86ee78a 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -20,18 +20,17 @@ package org.apache.beam.runners.mapreduce.translation;
 import org.apache.beam.sdk.io.Read;
 
 /**
- * Translates a {@link Read.Bounded} to a {@link ReadOperation}.
+ * Translates a {@link Read.Bounded} to a {@link SourceOperation}.
  */
 class ReadBoundedTranslator<T> extends 
TransformTranslator.Default<Read.Bounded<T>> {
   @Override
   public void translateNode(Read.Bounded transform, TranslationContext 
context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
-    ReadOperation operation = new ReadOperation(transform.getSource());
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        operation,
+    SourceOperation operation = new SourceOperation(transform.getSource());
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
deleted file mode 100644
index c199dc6..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A Read.Bounded place holder {@link Operation} during pipeline translation.
- */
-class ReadOperation<T> extends Operation<T> {
-  private final BoundedSource<T> source;
-
-  ReadOperation(BoundedSource<T> source) {
-    super(1);
-    this.source = checkNotNull(source, "source");
-  }
-
-  @Override
-  public void process(WindowedValue elem) {
-    throw new IllegalStateException(
-        String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
-  }
-
-  BoundedSource<?> getSource() {
-    return source;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index 83d1af5..251828e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -34,8 +34,9 @@ public class ReifyTimestampAndWindowsParDoOperation extends 
ParDoOperation {
 
   public ReifyTimestampAndWindowsParDoOperation(
       PipelineOptions options,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), 
windowingStrategy);
+      WindowingStrategy<?, ?> windowingStrategy,
+      Graphs.Tag outTag) {
+    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), 
windowingStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
new file mode 100644
index 0000000..782cfef
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * {@link Operation} that materializes input for group by key.
+ */
+public class ShuffleWriteOperation<T> extends Operation<T> {
+
+  private final Coder<Object> keyCoder;
+  private final Coder<Object> valueCoder;
+
+  private transient TaskInputOutputContext<Object, Object, Object, Object> 
taskContext;
+
+  public ShuffleWriteOperation(Coder<Object> keyCoder, Coder<Object> 
valueCoder) {
+    super(0);
+    this.keyCoder = checkNotNull(keyCoder, "keyCoder");
+    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    this.taskContext = checkNotNull(taskContext, "taskContext");
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) throws IOException, 
InterruptedException {
+    KV<?, ?> kv = (KV<?, ?>) elem.getValue();
+    ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+    keyCoder.encode(kv.getKey(), keyStream);
+
+    ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+    valueCoder.encode(kv.getValue(), valueStream);
+    taskContext.write(new BytesWritable(keyStream.toByteArray()), 
valueStream.toByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
new file mode 100644
index 0000000..2163f34
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A Read.Bounded place holder {@link Operation} during pipeline translation.
+ */
+class SourceOperation<T> extends Operation<T> {
+  private final BoundedSource<T> source;
+
+  SourceOperation(BoundedSource<T> source) {
+    super(1);
+    this.source = checkNotNull(source, "source");
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
+  }
+
+  BoundedSource<?> getSource() {
+    return source;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 365bdc0..da8ebff 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -53,8 +53,8 @@ public class TranslationContext {
     return userGraphContext;
   }
 
-  public void addInitStep(Graphs.Step step) {
-    initGraph.addStep(step);
+  public void addInitStep(Graphs.Step step, List<Graphs.Tag> inTags, 
List<Graphs.Tag> outTags) {
+    initGraph.addStep(step, inTags, outTags);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
index f79260a..e51d392 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
@@ -26,16 +26,11 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Lookup table mapping PTransform types to associated TransformTranslator 
implementations.
  */
 public class TranslatorRegistry {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TranslatorRegistry.class);
-
   private static final Map<Class<? extends PTransform>, TransformTranslator> 
TRANSLATORS =
       new HashMap<>();
 
@@ -49,10 +44,6 @@ public class TranslatorRegistry {
   }
 
   public static TransformTranslator<?> getTranslator(PTransform<?, ?> 
transform) {
-    TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
-    if (translator == null) {
-      LOG.warn("Unsupported operator={}", transform.getClass().getName());
-    }
-    return translator;
+    return TRANSLATORS.get(transform.getClass());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
index 815ce77..d5eac73 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -32,11 +32,9 @@ public class ViewTranslator extends 
TransformTranslator.Default<View.CreatePColl
 
     ViewOperation<?> operation =
         new ViewOperation<>((Coder) 
transform.getView().getPCollection().getCoder());
-
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        operation,
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
index 367c375..3908870 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
@@ -29,10 +29,9 @@ public class WindowAssignTranslator<T> extends 
TransformTranslator.Default<Windo
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
     WindowAssignOperation<T, ?> operation = new 
WindowAssignOperation<>(transform.getWindowFn());
-    context.addInitStep(Graphs.Step.of(
-        userGraphContext.getStepName(),
-        operation,
+    context.addInitStep(
+        Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),
-        userGraphContext.getOutputTags()));
+        userGraphContext.getOutputTags());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
deleted file mode 100644
index 2eb4684..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Throwables;
-import java.io.ByteArrayOutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * {@link Operation} that materializes input for group by key.
- */
-public class WriteOperation<T> extends Operation<T> {
-
-  private final Coder<Object> keyCoder;
-  private final Coder<Object> valueCoder;
-
-  private transient TaskInputOutputContext<Object, Object, Object, Object> 
taskContext;
-
-  public WriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) {
-    super(0);
-    this.keyCoder = checkNotNull(keyCoder, "keyCoder");
-    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
-  }
-
-  @Override
-  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
-    this.taskContext = checkNotNull(taskContext, "taskContext");
-  }
-
-  @Override
-  public void process(WindowedValue<T> elem) {
-    KV<?, ?> kv = (KV<?, ?>) elem.getValue();
-    try {
-      ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
-      keyCoder.encode(kv.getKey(), keyStream);
-
-      ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
-      valueCoder.encode(kv.getValue(), valueStream);
-      taskContext.write(new BytesWritable(keyStream.toByteArray()), 
valueStream.toByteArray());
-    } catch (Exception e) {
-      Throwables.throwIfUnchecked(e);
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
index cf5262f..ac965cb 100644
--- 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
+++ 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -55,7 +55,8 @@ public class GraphPlannerTest {
     p.traverseTopologically(graphConverter);
 
     GraphPlanner planner = new GraphPlanner();
-    Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+    Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
+    fusedGraph = planner.plan(fusedGraph);
 
     assertEquals(1, Iterables.size(fusedGraph.getFusedSteps()));
     assertEquals(3, 
Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size());

Reply via email to