mr-runner: refactors and creates Graph data structures to handle general Beam 
pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16e63205
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16e63205
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16e63205

Branch: refs/heads/mr-runner
Commit: 16e63205bee3ade711ebffc3c74e18aec6d50c01
Parents: ee1cce9
Author: Pei He <p...@apache.org>
Authored: Fri Jul 28 16:31:41 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/MapReducePipelineOptions.java     |  17 +
 .../runners/mapreduce/MapReduceRegistrar.java   |  17 +
 .../beam/runners/mapreduce/MapReduceRunner.java |  48 ++-
 .../runners/mapreduce/MapReduceWordCount.java   | 218 ----------
 .../beam/runners/mapreduce/package-info.java    |  21 +
 .../mapreduce/translation/BeamInputFormat.java  |  21 +-
 .../mapreduce/translation/BeamMapper.java       |  19 +-
 .../mapreduce/translation/BeamReducer.java      |  20 +-
 .../mapreduce/translation/FlattenOperation.java |  37 ++
 .../translation/FlattenTranslator.java          |  37 ++
 .../runners/mapreduce/translation/Graph.java    | 400 +++++++------------
 .../mapreduce/translation/GraphConverter.java   | 108 ++---
 .../mapreduce/translation/GraphPlanner.java     | 142 ++-----
 .../mapreduce/translation/GraphVisitor.java     |  11 -
 .../runners/mapreduce/translation/Graphs.java   | 188 +++++++++
 .../GroupAlsoByWindowsParDoOperation.java       |  24 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  17 +
 .../translation/GroupByKeyOperation.java        |  54 +++
 .../translation/GroupByKeyTranslator.java       |  46 +++
 .../mapreduce/translation/JobPrototype.java     | 257 +++++-------
 .../translation/NormalParDoOperation.java       |  12 +-
 .../mapreduce/translation/Operation.java        |  30 +-
 .../mapreduce/translation/OutputReceiver.java   |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |  14 +-
 .../mapreduce/translation/ParDoTranslator.java  |  46 +++
 .../translation/ReadBoundedTranslator.java      |  37 ++
 .../mapreduce/translation/ReadOperation.java    |  45 +++
 .../ReifyTimestampAndWindowsParDoOperation.java |  24 +-
 .../translation/TransformTranslator.java        |  48 +++
 .../translation/TranslationContext.java         | 128 ++++++
 .../translation/TranslatorRegistry.java         |  58 +++
 .../mapreduce/translation/ViewOperation.java    |  59 +++
 .../mapreduce/translation/ViewTranslator.java   |  42 ++
 .../translation/WindowAssignOperation.java      |  35 +-
 .../translation/WindowAssignTranslator.java     |  38 ++
 .../mapreduce/translation/WriteOperation.java   |  33 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  25 +-
 .../translation/GraphConverterTest.java         |  33 +-
 .../mapreduce/translation/GraphPlannerTest.java |  37 +-
 39 files changed, 1568 insertions(+), 881 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 7fe66ba..73c7d47 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce;
 
 import com.google.common.collect.ImmutableSet;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
index eb960b8..c8b0eea 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce;
 
 import com.google.auto.service.AutoService;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 11ac9a7..b6a82d1 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -1,23 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
-import org.apache.beam.runners.mapreduce.translation.Graph;
+import org.apache.beam.runners.mapreduce.translation.Graphs;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
 import org.apache.beam.runners.mapreduce.translation.JobPrototype;
+import org.apache.beam.runners.mapreduce.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
 /**
- * {@link PipelineRunner} for crunch.
+ * {@link PipelineRunner} for MapReduce.
  */
 public class MapReduceRunner extends PipelineRunner<PipelineResult> {
 
@@ -39,22 +55,20 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
 
   @Override
   public PipelineResult run(Pipeline pipeline) {
-    GraphConverter graphConverter = new GraphConverter();
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
     pipeline.traverseTopologically(graphConverter);
 
-    Graph graph = graphConverter.getGraph();
-
     GraphPlanner planner = new GraphPlanner();
-    Graph fusedGraph = planner.plan(graph);
-    for (Graph.Vertex vertex : fusedGraph.getAllVertices()) {
-      if (vertex.getStep().getTransform() instanceof GroupByKey) {
-        JobPrototype jobPrototype = JobPrototype.create(1, vertex);
-        try {
-          Job job = jobPrototype.build(options.getJarClass(), new 
Configuration());
-          job.waitForCompletion(true);
-        } catch (Exception e) {
-          Throwables.throwIfUnchecked(e);
-        }
+    Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+    int stageId = 0;
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, 
options);
+      try {
+        Job job = jobPrototype.build(options.getJarClass(), new 
Configuration());
+        job.waitForCompletion(true);
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
deleted file mode 100644
index d0c7b78..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
+++ /dev/null
@@ -1,218 +0,0 @@
-package org.apache.beam.runners.mapreduce;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.StringTokenizer;
-
-import javax.annotation.Nullable;
-import org.apache.beam.runners.mapreduce.translation.BeamInputFormat;
-import org.apache.beam.runners.mapreduce.translation.BeamMapper;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.OffsetBasedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.log4j.BasicConfigurator;
-
-public class MapReduceWordCount {
-
-  public static class CreateSource<T> extends OffsetBasedSource<T> {
-    private final List<byte[]> allElementsBytes;
-    private final long totalSize;
-    private final Coder<T> coder;
-
-    public static <T> CreateSource<T> fromIterable(Iterable<T> elements, 
Coder<T> elemCoder)
-        throws CoderException, IOException {
-      ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
-      long totalSize = 0L;
-      for (T element : elements) {
-        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
-        allElementsBytes.add(bytes);
-        totalSize += bytes.length;
-      }
-      return new CreateSource<>(allElementsBytes.build(), totalSize, 
elemCoder);
-    }
-
-    /**
-     * Create a new source with the specified bytes. The new source owns the 
input element bytes,
-     * which must not be modified after this constructor is called.
-     */
-    private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> 
coder) {
-      super(0, elementBytes.size(), 1);
-      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
-      this.totalSize = totalSize;
-      this.coder = coder;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
-      return totalSize;
-    }
-
-    @Override
-    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options)
-        throws IOException {
-      return new BytesReader<>(this);
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    @Override
-    public long getMaxEndOffset(PipelineOptions options) throws Exception {
-      return allElementsBytes.size();
-    }
-
-    @Override
-    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
-      List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) 
end);
-      long primarySizeEstimate =
-          (long) (totalSize * primaryElems.size() / (double) 
allElementsBytes.size());
-      return new CreateSource<>(primaryElems, primarySizeEstimate, coder);
-    }
-
-    @Override
-    public long getBytesPerOffset() {
-      if (allElementsBytes.size() == 0) {
-        return 1L;
-      }
-      return Math.max(1, totalSize / allElementsBytes.size());
-    }
-
-    private static class BytesReader<T> extends OffsetBasedReader<T> {
-      private int index;
-      /**
-       * Use an optional to distinguish between null next element (as 
Optional.absent()) and no next
-       * element (next is null).
-       */
-      @Nullable
-      private Optional<T> next;
-
-      public BytesReader(CreateSource<T> source) {
-        super(source);
-        index = -1;
-      }
-
-      @Override
-      @Nullable
-      public T getCurrent() throws NoSuchElementException {
-        if (next == null) {
-          throw new NoSuchElementException();
-        }
-        return next.orNull();
-      }
-
-      @Override
-      public void close() throws IOException {}
-
-      @Override
-      protected long getCurrentOffset() {
-        return index;
-      }
-
-      @Override
-      protected boolean startImpl() throws IOException {
-        return advanceImpl();
-      }
-
-      @Override
-      public synchronized CreateSource<T> getCurrentSource() {
-        return (CreateSource<T>) super.getCurrentSource();
-      }
-
-      @Override
-      protected boolean advanceImpl() throws IOException {
-        CreateSource<T> source = getCurrentSource();
-        if (index + 1 >= source.allElementsBytes.size()) {
-          next = null;
-          return false;
-        }
-        index++;
-        next =
-            Optional.fromNullable(
-                CoderUtils.decodeFromByteArray(source.coder, 
source.allElementsBytes.get(index)));
-        return true;
-      }
-    }
-  }
-
-  public static class TokenizerMapper
-      extends Mapper<Object, Text, Text, IntWritable>{
-
-    private final static IntWritable one = new IntWritable(1);
-    private Text word = new Text();
-
-    public void map(Object key, Text value, Context context
-    ) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
-      }
-    }
-  }
-
-  public static class IntSumReducer
-      extends Reducer<Text, IntWritable, Text, IntWritable> {
-    private IntWritable result = new IntWritable();
-
-    public void reduce(Text key, Iterable<IntWritable> values, Context context)
-        throws IOException, InterruptedException {
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
-      }
-      result.set(sum);
-      context.write(key, result);
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    BasicConfigurator.configure();
-
-    Configuration conf = new Configuration();
-
-    BoundedSource<KV<String, Integer>> source = CreateSource.fromIterable(
-        ImmutableList.of(KV.of("k1", 10), KV.of("k2", 2)),
-        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
-
-    conf.set(
-        BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
-        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
-
-    Job job = Job.getInstance(conf, "word count");
-    job.setJarByClass(MapReduceWordCount.class);
-    job.setInputFormatClass(BeamInputFormat.class);
-    job.setMapperClass(BeamMapper.class);
-    //job.setMapperClass(TokenizerMapper.class);
-    //job.setCombinerClass(IntSumReducer.class);
-    //job.setReducerClass(IntSumReducer.class);
-    //job.setOutputKeyClass(Text.class);
-    //job.setOutputValueClass(IntWritable.class);
-    //FileInputFormat.addInputPath(job, new Path(args[0]));
-    job.setOutputFormatClass(NullOutputFormat.class);
-    System.exit(job.waitForCompletion(true) ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
new file mode 100644
index 0000000..d511405
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * MapReduce runner implementation.
+ */
+package org.apache.beam.runners.mapreduce;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 0cfb14b..8a27a85 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -14,8 +31,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -66,6 +81,7 @@ public class BeamInputFormat<T> extends InputFormat {
   @Override
   public RecordReader createRecordReader(
       InputSplit split, TaskAttemptContext context) throws IOException, 
InterruptedException {
+    // TODO: it should initiates from InputSplit.
     source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray(
         
Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)),
         "");
@@ -121,6 +137,7 @@ public class BeamInputFormat<T> extends InputFormat {
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
       if (!started) {
+        started = true;
         return reader.start();
       } else {
         return reader.advance();

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index b5e4edc..bc52967 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -9,7 +26,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
- * Created by peihe on 21/07/2017.
+ * Adapter for executing Beam transforms in {@link Mapper}.
  */
 public class BeamMapper<ValueInT, ValueOutT>
     extends Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
index 9b8bd82..3490b3b 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -19,8 +36,9 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
+
 /**
- * Created by peihe on 25/07/2017.
+ * Adapter for executing Beam transforms in {@link Reducer}.
  */
 public class BeamReducer<ValueInT, ValueOutT>
     extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
new file mode 100644
index 0000000..191b346
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Flatten operation.
+ */
+public class FlattenOperation<T> extends Operation<T> {
+
+  public FlattenOperation() {
+    super(1);
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    for (OutputReceiver receiver : getOutputReceivers()) {
+      receiver.process(elem);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
new file mode 100644
index 0000000..8860caf
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.Flatten;
+
+/**
+ * Translates a {@link Flatten} to a {@link FlattenOperation}.
+ */
+public class FlattenTranslator<T> extends 
TransformTranslator.Default<Flatten.PCollections<T>> {
+  @Override
+  public void translateNode(Flatten.PCollections<T> transform, 
TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
+
+    Operation<?> operation = new FlattenOperation();
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        operation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index e360419..b6900cc 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -1,311 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.LinkedList;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.graph.ElementOrder;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.commons.lang.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
- * Created by peihe on 06/07/2017.
+ * Graph that represents a Beam DAG.
  */
-public class Graph {
+public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends 
Graph.AbstractTag> {
 
-  private final Map<Step, Vertex> vertices;
-  private final Map<HeadTail, Edge> edges;
-  private final Set<Vertex> leafVertices;
+  private final MutableGraph<Vertex> graph;
 
   public Graph() {
-    this.vertices = Maps.newHashMap();
-    this.edges = Maps.newHashMap();
-    this.leafVertices = Sets.newHashSet();
+    this.graph = GraphBuilder.directed()
+        .allowsSelfLoops(false)
+        .nodeOrder(ElementOrder.insertion())
+        .build();
   }
 
-  public Vertex addVertex(Step step) {
-    checkState(!vertices.containsKey(step));
-    Vertex v = new Vertex(step);
-    vertices.put(step, v);
-    leafVertices.add(v);
-    return v;
+  /**
+   * Adds {@link StepT} to this {@link Graph}.
+   */
+  public void addStep(StepT step) {
+    graph.addNode(step);
+    Set<Vertex> nodes = graph.nodes();
+    for (TagT tag : step.getInputTags()) {
+      if (!nodes.contains(tag)) {
+        graph.addNode(tag);
+      }
+      graph.putEdge(tag, step);
+    }
+    for (TagT tag : step.getOutputTags()) {
+      if (!nodes.contains(tag)) {
+        graph.addNode(tag);
+      }
+      graph.putEdge(step, tag);
+    }
   }
 
-  public Edge addEdge(Vertex head, Vertex tail, Coder<?> coder) {
-    HeadTail headTail = HeadTail.of(head, tail);
-    checkState(!edges.containsKey(headTail));
-    Edge e = new Edge(headTail, coder);
-    edges.put(headTail, e);
-    head.addOutgoing(e);
-    tail.addIncoming(e);
-    leafVertices.remove(head);
-    return e;
+  public void removeStep(StepT step) {
+    graph.removeNode(step);
   }
 
-  public Vertex getVertex(Step step) {
-    return vertices.get(step);
+  public void removeTag(TagT tag) {
+    graph.removeNode(tag);
   }
 
-  public Edge getEdge(Vertex head, Vertex tail) {
-    return edges.get(HeadTail.of(head, tail));
+  public void addEdge(TagT inTag, StepT step) {
+    graph.putEdge(inTag, step);
   }
 
-  public Iterable<Vertex> getAllVertices() {
-    return vertices.values();
+  public void addEdge(StepT step, TagT outTag) {
+    graph.putEdge(step, outTag);
   }
 
-  public Iterable<Edge> getAllEdges() {
-    return edges.values();
+  public void removeEdge(TagT inTag, StepT step) {
+    graph.removeEdge(inTag, step);
   }
 
-  public Iterable<Vertex> getLeafVertices() {
-    return ImmutableList.copyOf(leafVertices);
+  public void removeEdge(StepT step, TagT outTag) {
+    graph.removeEdge(step, outTag);
   }
 
-  public void accept(GraphVisitor visitor) {
-    for (Vertex v : leafVertices) {
-      v.accept(visitor);
-    }
+  public List<StepT> getSteps() {
+    return castToStepList(FluentIterable.from(graph.nodes())
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractStep;
+          }}))
+        .toList();
   }
 
-  //TODO: add equals, hashCode, toString for following classses.
-
-  public static class Vertex {
-    private final Step step;
-    private final Set<Edge> incoming;
-    private final Set<Edge> outgoing;
-
-    public Vertex(Step step) {
-      this.step = checkNotNull(step, "step");
-      this.incoming = Sets.newHashSet();
-      this.outgoing = Sets.newHashSet();
-    }
-
-    public Step getStep() {
-      return step;
-    }
-
-    public Set<Edge> getIncoming() {
-      return incoming;
-    }
-
-    public Set<Edge> getOutgoing() {
-      return outgoing;
-    }
-
-    public boolean isSource() {
-      PTransform<?, ?> transform = step.getTransform();
-      return transform instanceof Read.Bounded || transform instanceof 
Read.Unbounded;
-    }
-
-    public boolean isGroupByKey() {
-      return step.getTransform() instanceof GroupByKey;
-    }
-
-    public void addIncoming(Edge edge) {
-      incoming.add(edge);
-    }
-
-    public void addOutgoing(Edge edge) {
-      outgoing.add(edge);
-    }
-
-    public void accept(GraphVisitor visitor) {
-      PTransform<?, ?> transform = step.getTransform();
-      if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput
-          || transform instanceof Window.Assign) {
-        visitor.visitParDo(this);
-      } else if (transform instanceof GroupByKey) {
-        visitor.visitGroupByKey(this);
-      } else if (transform instanceof Read.Bounded) {
-        visitor.visitRead(this);
-      } else if (transform instanceof Flatten.PCollections
-          || transform instanceof Flatten.Iterables) {
-        visitor.visitFlatten(this);
-      } else {
-        throw new RuntimeException("Unexpected transform type: " + 
transform.getClass());
-      }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-      if (obj instanceof Vertex) {
-        Vertex other = (Vertex) obj;
-        return step.equals(other.step);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(this.getClass(), step);
-    }
-
-    @Override
-    public String toString() {
-      return new ReflectionToStringBuilder(this, 
ToStringStyle.SHORT_PREFIX_STYLE)
-          .setExcludeFieldNames(new String[] { "outgoing", "incoming" 
}).toString();
-    }
+  public List<StepT> getStartSteps() {
+    return castToStepList(FluentIterable.from(graph.nodes())
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractStep && graph.inDegree(input) == 0;
+          }}))
+        .toList();
   }
 
-  public static class Edge {
-    private final HeadTail headTail;
-    private final Coder<?> coder;
-    private final Set<NodePath> paths;
-
-    public static Edge of(HeadTail headTail, Coder<?> coder) {
-      return new Edge(headTail, coder);
-    }
-
-    private Edge(HeadTail headTail, Coder<?> coder) {
-      this.headTail = checkNotNull(headTail, "headTail");
-      this.coder = checkNotNull(coder, "coder");
-      this.paths = Sets.newHashSet();
-    }
-
-    public Vertex getHead() {
-      return headTail.getHead();
-    }
-
-    public Vertex getTail() {
-      return headTail.getTail();
-    }
-
-    public Coder<?> getCoder() {
-      return coder;
-    }
-
-    public Set<NodePath> getPaths() {
-      return paths;
-    }
-
-    public void addPath(NodePath path) {
-      paths.add(checkNotNull(path, "path"));
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-      if (obj instanceof Edge) {
-        Edge other = (Edge) obj;
-        return headTail.equals(other.headTail)
-            && paths.equals(other.paths) && coder.equals(other.coder);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(headTail, paths, coder);
-    }
-
-    @Override
-    public String toString() {
-      return ReflectionToStringBuilder.toString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
-    }
+  public List<TagT> getInputTags() {
+    return castToTagList(FluentIterable.from(graph.nodes())
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractTag && graph.inDegree(input) == 0;
+          }}))
+        .toList();
   }
 
-  public static class NodePath {
-    private final LinkedList<Step> path;
-
-    public NodePath() {
-      this.path = new LinkedList<>();
-    }
-
-    public NodePath(NodePath nodePath) {
-      this.path = new LinkedList<>(nodePath.path);
-    }
+  public List<TagT> getOutputTags() {
+    return castToTagList(FluentIterable.from(graph.nodes())
+        .filter(new Predicate<Vertex>() {
+          @Override
+          public boolean apply(Vertex input) {
+            return input instanceof AbstractTag && graph.outDegree(input) == 0;
+          }}))
+        .toList();
+  }
 
-    public void addFirst(Step step) {
-      path.addFirst(step);
-    }
+  public StepT getProducer(TagT tag) {
+    return (StepT) Iterables.getOnlyElement(graph.predecessors(tag));
+  }
 
-    public void addLast(Step step) {
-      path.addLast(step);
-    }
+  public List<StepT> getConsumers(TagT tag) {
+    return castToStepList(graph.successors(tag)).toList();
+  }
 
-    public Iterable<Step> steps() {
-      return ImmutableList.copyOf(path);
-    }
+  private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) {
+    return FluentIterable.from(vertices)
+        .transform(new Function<Vertex, StepT>() {
+          @Override
+          public StepT apply(Vertex input) {
+            return (StepT) input;
+          }});
+  }
 
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-      if (obj instanceof NodePath) {
-        NodePath other = (NodePath) obj;
-        return path.equals(other.path);
-      }
-      return false;
-    }
+  private FluentIterable<TagT> castToTagList(Iterable<Vertex> vertices) {
+    return FluentIterable.from(vertices)
+        .transform(new Function<Vertex, TagT>() {
+          @Override
+          public TagT apply(Vertex input) {
+            return (TagT) input;
+          }});
+  }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(this.getClass(), path.hashCode());
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
     }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Step step : path) {
-        sb.append(step.getFullName() + "|");
-      }
-      if (path.size() > 0) {
-        sb.deleteCharAt(sb.length() - 1);
-      }
-      return sb.toString();
+    if (obj instanceof Graph) {
+      Graph other = (Graph) obj;
+      return com.google.common.graph.Graphs.equivalent(this.graph, 
other.graph);
     }
+    return false;
   }
 
-  @AutoValue
-  public abstract static class Step {
-    abstract String getFullName();
-    // TODO: remove public
-    public abstract PTransform<?, ?> getTransform();
-    abstract WindowingStrategy<?, ?> getWindowingStrategy();
-    abstract List<TupleTag<?>> getInputs();
-    abstract List<TupleTag<?>> getOutputs();
+  @Override
+  public int hashCode() {
+    return Objects.hash(this.getClass(), graph.nodes());
+  }
 
-    public static Step of(
-        String fullName,
-        PTransform<?, ?> transform,
-        WindowingStrategy<?, ?> windowingStrategy,
-        List<TupleTag<?>> inputs,
-        List<TupleTag<?>> outputs) {
-      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step(
-          fullName, transform, windowingStrategy, inputs, outputs);
-    }
+  /**
+   * Vertex interface of this Graph.
+   */
+  interface Vertex {
   }
 
-  @AutoValue
-  public abstract static class HeadTail {
-    abstract Vertex getHead();
-    abstract Vertex getTail();
+  public abstract static class AbstractStep<TagT extends AbstractTag> 
implements Vertex {
+    public abstract List<TagT> getInputTags();
+    public abstract List<TagT> getOutputTags();
+  }
 
-    public static HeadTail of(Vertex head, Vertex tail) {
-      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_HeadTail(head, 
tail);
-    }
+  public abstract static class AbstractTag implements Vertex {
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index e7e7598..1e818fa 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -1,77 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
-import com.google.common.collect.ImmutableList;
+import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReduceRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Pipeline translator for {@link MapReduceRunner}.
  */
 public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
 
+  private final TranslationContext context;
   private final Map<PValue, TupleTag<?>> pValueToTupleTag;
-  private final Map<TupleTag<?>, Graph.Vertex> outputToProducer;
-  private final Graph graph;
 
-  public GraphConverter() {
+  public GraphConverter(TranslationContext context) {
+    this.context = checkNotNull(context, "context");
     this.pValueToTupleTag = Maps.newHashMap();
-    this.outputToProducer = Maps.newHashMap();
-    this.graph = new Graph();
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    WindowingStrategy<?, ?> windowingStrategy =
-        getWindowingStrategy(node.getOutputs().values().iterator().next());
-    Graph.Step step = Graph.Step.of(
-        node.getFullName(),
-        node.getTransform(),
-        windowingStrategy,
-        ImmutableList.copyOf(node.getInputs().keySet()),
-        ImmutableList.copyOf(node.getOutputs().keySet()));
-    Graph.Vertex v = graph.addVertex(step);
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
+    // check if current composite transforms need to be translated.
+    // If not, all sub transforms will be translated in 
visitPrimitiveTransform.
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null) {
+      TransformTranslator translator = 
TranslatorRegistry.getTranslator(transform);
 
-    for (PValue pValue : node.getInputs().values()) {
-      TupleTag<?> tag = pValueToTupleTag.get(pValue);
-      if (outputToProducer.containsKey(tag)) {
-        Graph.Vertex producer = outputToProducer.get(tag);
-
-        PCollection<?> pc = (PCollection<?>) pValue;
-        graph.addEdge(producer, v, pc.getCoder());
+      if (translator != null && applyCanTranslate(transform, node, 
translator)) {
+        applyTransform(transform, node, translator);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
       }
     }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
 
-    for (Map.Entry<TupleTag<?>, PValue> entry : node.getOutputs().entrySet()) {
-      pValueToTupleTag.put(entry.getValue(), entry.getKey());
-      outputToProducer.put(entry.getKey(), v);
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    if (!node.isRootNode()) {
+      PTransform<?, ?> transform = node.getTransform();
+      TransformTranslator translator = 
TranslatorRegistry.getTranslator(transform);
+      if (translator == null || !applyCanTranslate(transform, node, 
translator)) {
+        throw new UnsupportedOperationException(
+            "The transform " + transform + " is currently not supported.");
+      }
+      applyTransform(transform, node, translator);
     }
   }
 
-  private WindowingStrategy<?, ?> getWindowingStrategy(PValue pValue) {
-    if (pValue instanceof PCollection) {
-      return ((PCollection) pValue).getWindowingStrategy();
-    } else if (pValue instanceof PCollectionList) {
-      return ((PCollectionList) pValue).get(0).getWindowingStrategy();
-    } else if (pValue instanceof PCollectionTuple) {
-      return ((PCollectionTuple) 
pValue).getAll().values().iterator().next().getWindowingStrategy();
-    } else if (pValue instanceof PCollectionView) {
-      return ((PCollectionView) 
pValue).getPCollection().getWindowingStrategy();
-    } else {
-      throw new RuntimeException("Unexpected pValue type: " + 
pValue.getClass());
-    }
+  private <T extends PTransform<?, ?>> void applyTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) 
translator;
+    context.getUserGraphContext().setCurrentNode(node);
+    typedTranslator.translateNode(typedTransform, context);
   }
 
-  public Graph getGraph() {
-    return graph;
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) 
translator;
+    context.getUserGraphContext().setCurrentNode(node);
+    return typedTranslator.canTranslate(typedTransform, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 9ae8365..be694e4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -1,117 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 /**
- * Created by peihe on 06/07/2017.
+ * Class that optimizes the initial graph to a fused graph.
  */
 public class GraphPlanner {
 
-  public Graph plan(Graph initGraph) {
-    FusionVisitor fusionVisitor = new FusionVisitor();
-    initGraph.accept(fusionVisitor);
-    return fusionVisitor.getFusedGraph();
-  }
-
-  private class FusionVisitor implements GraphVisitor {
 
-    private Graph fusedGraph;
-    private Graph.Vertex workingVertex;
-    private Graph.NodePath workingPath;
-    private Coder<?> workingEdgeCoder;
-
-    FusionVisitor() {
-      fusedGraph = new Graph();
-      workingVertex = null;
-      workingPath = null;
-    }
-
-    @Override
-    public void visitRead(Graph.Vertex read) {
-      if (workingVertex == null) {
-        // drop if read is leaf vertex.
-        return;
-      }
-      Graph.Vertex v = fusedGraph.addVertex(read.getStep());
-      workingPath.addFirst(read.getStep());
-      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex, workingEdgeCoder);
-      edge.addPath(workingPath);
-    }
-
-    @Override
-    public void visitParDo(Graph.Vertex parDo) {
-      Graph.Step step = parDo.getStep();
-      checkArgument(
-          step.getTransform().getAdditionalInputs().isEmpty(),
-          "Side inputs are not " + "supported.");
-      checkArgument(
-          parDo.getIncoming().size() == 1,
-          "Side inputs are not supported.");
-      Graph.Edge inEdge = parDo.getIncoming().iterator().next();
+  public GraphPlanner() {
+  }
 
-      if (workingVertex == null) {
-        // Leaf vertex
-        workingVertex = fusedGraph.addVertex(step);
-        workingPath = new Graph.NodePath();
-        workingEdgeCoder = inEdge.getCoder();
-      } else {
-        workingPath.addFirst(step);
-      }
-      processParent(inEdge.getHead());
-    }
+  public Graphs.FusedGraph plan(Graph<Graphs.Step, Graphs.Tag> initGraph) {
+    Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph();
+    // Convert from the list of steps to Graphs.
+    for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) {
+      Graphs.FusedStep fusedStep = new Graphs.FusedStep();
+      fusedStep.addStep(step);
+      fusedGraph.addFusedStep(fusedStep);
 
-    @Override
-    public void visitFlatten(Graph.Vertex flatten) {
-      if (workingVertex == null) {
-        return;
-      }
-      Graph.NodePath basePath = workingPath;
-      Graph.Vertex baseVertex = workingVertex;
-      for (Graph.Edge e : flatten.getIncoming()) {
-        workingPath = new Graph.NodePath(basePath);
-        workingVertex = baseVertex;
-        workingEdgeCoder = e.getCoder();
-        processParent(e.getHead());
-      }
+      tryFuse(fusedGraph, fusedStep);
     }
+    return fusedGraph;
+  }
 
-    @Override
-    public void visitGroupByKey(Graph.Vertex groupByKey) {
-      if (workingVertex == null) {
-        return;
-      }
-      Graph.Step step = groupByKey.getStep();
-      Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step);
-
-      Graph.Edge edge = fusedGraph.addEdge(
-          addedGroupByKey,
-          workingVertex,
-          workingEdgeCoder);
-      edge.addPath(workingPath);
-      Graph.Edge inEdge = groupByKey.getIncoming().iterator().next();
-      workingVertex = addedGroupByKey;
-      workingPath = new Graph.NodePath();
-      workingEdgeCoder = inEdge.getCoder();
-      processParent(inEdge.getHead());
+  private void tryFuse(Graphs.FusedGraph fusedGraph, Graphs.FusedStep 
fusedStep) {
+    if (fusedStep.getOutputTags().size() != 1) {
+      return;
     }
-
-    public Graph getFusedGraph() {
-      return fusedGraph;
+    Graphs.Tag outTag = Iterables.getOnlyElement(fusedStep.getOutputTags());
+    if (fusedGraph.getConsumers(outTag).size() != 1) {
+      return;
     }
-
-    private void processParent(Graph.Vertex parent) {
-      Graph.Step step = parent.getStep();
-      Graph.Vertex v = fusedGraph.getVertex(step);
-      if (v == null) {
-        parent.accept(this);
-      } else {
-        // TODO: parent is consumed more than once.
-        // It is duplicated in multiple outgoing path. Figure out the impact.
-        workingPath.addFirst(step);
-        fusedGraph.getEdge(v, workingVertex).addPath(workingPath);
-      }
+    Graphs.FusedStep consumer = 
Iterables.getOnlyElement(fusedGraph.getConsumers(outTag));
+    if (fusedStep.containsGroupByKey() && consumer.containsGroupByKey()) {
+      return;
     }
+    fusedGraph.merge(fusedStep, consumer);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
deleted file mode 100644
index fe4a76f..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.beam.runners.mapreduce.translation;
-
-/**
- * Created by peihe on 06/07/2017.
- */
-public interface GraphVisitor {
-  void visitRead(Graph.Vertex read);
-  void visitParDo(Graph.Vertex parDo);
-  void visitFlatten(Graph.Vertex flatten);
-  void visitGroupByKey(Graph.Vertex groupByKey);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
new file mode 100644
index 0000000..029d425
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that defines graph vertices.
+ */
+public class Graphs {
+
+  private Graphs() {}
+
+  public static class FusedGraph {
+    private final Graph<FusedStep, Tag> graph;
+
+    public FusedGraph() {
+      this.graph = new Graph<>();
+    }
+
+    public void addFusedStep(FusedStep fusedStep) {
+      graph.addStep(fusedStep);
+    }
+
+    public void merge(FusedStep src, FusedStep dest) {
+      for (Step step : src.steps.getSteps()) {
+        dest.addStep(step);
+      }
+      for (Tag inTag : src.getInputTags()) {
+        graph.addEdge(inTag, dest);
+      }
+      for (Tag outTag : src.getOutputTags()) {
+        graph.addEdge(dest, outTag);
+      }
+      graph.removeStep(src);
+    }
+
+    public FusedStep getProducer(Tag tag) {
+      return graph.getProducer(tag);
+    }
+
+    public List<FusedStep> getConsumers(Tag tag) {
+      return graph.getConsumers(tag);
+    }
+
+    public List<FusedStep> getFusedSteps() {
+      return graph.getSteps();
+    }
+  }
+
+  public static class FusedStep extends Graph.AbstractStep<Tag> {
+    private final Graph<Step, Tag> steps;
+    private Step groupByKeyStep;
+
+    public FusedStep() {
+      this.steps = new Graph<>();
+      this.groupByKeyStep = null;
+    }
+
+    @Override
+    public List<Tag> getInputTags() {
+      return steps.getInputTags();
+    }
+
+    @Override
+    public List<Tag> getOutputTags() {
+      return steps.getOutputTags();
+    }
+
+    public void addStep(Step step) {
+      steps.addStep(step);
+      if (step.getOperation() instanceof GroupByKeyOperation) {
+        groupByKeyStep = step;
+      }
+    }
+
+    public void removeStep(Step step) {
+      steps.removeStep(step);
+    }
+
+    public void removeTag(Tag tag) {
+      steps.removeTag(tag);
+    }
+
+    public void addEdge(Tag inTag, Step step) {
+      steps.addEdge(inTag, step);
+    }
+
+    public void addEdge(Step step, Tag outTag) {
+      steps.addEdge(step, outTag);
+    }
+
+    public void removeEdge(Tag inTag, Step step) {
+      steps.removeEdge(inTag, step);
+    }
+
+    public void removeEdge(Step step, Tag outTag) {
+      steps.removeEdge(step, outTag);
+    }
+
+    public Step getProducer(Tag tag) {
+      return steps.getProducer(tag);
+    }
+
+    public List<Step> getConsumers(Tag tag) {
+      return steps.getConsumers(tag);
+    }
+
+    public List<Step> getSteps() {
+      return steps.getSteps();
+    }
+
+    public List<Step> getStartSteps() {
+      return steps.getStartSteps();
+    }
+
+    public boolean containsGroupByKey() {
+      return groupByKeyStep != null;
+    }
+
+    @Nullable
+    public Step getGroupByKeyStep() {
+      return groupByKeyStep;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (Step step : steps.getSteps()) {
+        sb.append(step.getFullName() + "|");
+      }
+      if (sb.length() > 0) {
+        sb.deleteCharAt(sb.length() - 1);
+      }
+      return sb.toString();
+    }
+
+    public String getFullName() {
+      return toString();
+    }
+  }
+
+  @AutoValue
+  public abstract static class Step extends Graph.AbstractStep<Tag> {
+    abstract String getFullName();
+    // TODO: remove public
+    public abstract Operation getOperation();
+
+    public static Step of(
+        String fullName,
+        Operation operation,
+        List<Tag> inputTags,
+        List<Tag> outputTags) {
+      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step(
+          inputTags, outputTags, fullName, operation);
+    }
+  }
+
+  @AutoValue
+  public abstract static class Tag extends Graph.AbstractTag {
+    abstract TupleTag<?> getTupleTag();
+    abstract Coder<?> getCoder();
+
+    public static Tag of(TupleTag<?> tupleTag, Coder<?> coder) {
+      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag(
+          tupleTag, coder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
index 1da8d26..66cf3b6 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -1,7 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
@@ -11,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
- * Created by peihe on 26/07/2017.
+ * {@link Operation} that executes a {@link 
GroupAlsoByWindowsViaOutputBufferDoFn}.
  */
 public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
 
@@ -19,11 +37,9 @@ public class GroupAlsoByWindowsParDoOperation extends 
ParDoOperation {
 
   public GroupAlsoByWindowsParDoOperation(
       PipelineOptions options,
-      TupleTag<Object> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy,
       Coder<?> inputCoder) {
-    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+    super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), 
windowingStrategy);
     this.inputCoder = checkNotNull(inputCoder, "inputCoder");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 8ee616d..5ac23a5 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
new file mode 100644
index 0000000..b0be494
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * A GroupByKey place holder {@link Operation} during pipeline translation.
+ */
+public class GroupByKeyOperation<K, V> extends Operation<KV<K, V>> {
+
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final KvCoder<K, V> kvCoder;
+
+  public GroupByKeyOperation(WindowingStrategy<?, ?> windowingStrategy, 
KvCoder<K, V> kvCoder) {
+    super(1);
+    this.windowingStrategy = checkNotNull(windowingStrategy, 
"windowingStrategy");
+    this.kvCoder = checkNotNull(kvCoder, "kvCoder");
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
+  }
+
+  public WindowingStrategy<?, ?> getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  public KvCoder<K, V> getKvCoder() {
+    return kvCoder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..e87ed09
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link GroupByKey} to {@link Operation Operations}.
+ */
+class GroupByKeyTranslator<K, V> extends 
TransformTranslator.Default<GroupByKey<K, V>> {
+  @Override
+  public void translateNode(GroupByKey<K, V> transform, TranslationContext 
context) {
+    TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
+
+    PCollection<?> inPCollection = (PCollection<?>) 
userGraphContext.getInput();
+    WindowingStrategy<?, ?> windowingStrategy = 
inPCollection.getWindowingStrategy();
+    Coder<?> inCoder = inPCollection.getCoder();
+
+    GroupByKeyOperation<K, V> groupByKeyOperation =
+        new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder);
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        groupByKeyOperation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

Reply via email to